In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 63 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=fe068c8c98809540162a324226c1ec2e1ca7325e8198d508198b4d8f4b02c92f
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySparkDemo").getOrCreate()

In [None]:
df = spark.createDataFrame(
    [
     ('Jack',42),
     ('Jill',37),
     ('John',35),
     ('Jane',48),
    ], ['name','age']
)

In [None]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [None]:
df.show()

+----+---+
|name|age|
+----+---+
|Jack| 42|
|Jill| 37|
|John| 35|
|Jane| 48|
+----+---+



In [None]:
over40 = df.filter(df.age > 40)
over40.show()

+----+---+
|name|age|
+----+---+
|Jack| 42|
|Jane| 48|
+----+---+



In [None]:
rows = over40.count()
print(rows)

2


In [None]:
#CSV

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

df = spark.read.option("header", True).csv("policy.csv")
df.printSchema()

root
 |-- policy: string (nullable = true)
 |-- make: string (nullable = true)
 |-- vehicle_age: string (nullable = true)
 |-- sum_insured: string (nullable = true)
 |-- inception_date: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- premium: string (nullable = true)



In [None]:
df.show(5)

+-------+------+-----------+-----------+--------------+----------+--------+-------+
| policy|  make|vehicle_age|sum_insured|inception_date|start_date|end_date|premium|
+-------+------+-----------+-----------+--------------+----------+--------+-------+
|CAR0001|TOYOTA|          1|      15000|      20180101|  20180101|20181231|   1000|
|CAR0001|TOYOTA|          2|      13500|      20180101|  20190101|20191231|    900|
|CAR0001|TOYOTA|          3|      12000|      20180101|  20200101|20201231|    800|
|CAR0002|SUBARU|          2|      14000|      20200210|  20200210|20210209|    950|
|CAR0003|  FORD|          6|      10000|      20180315|  20180315|20190314|    700|
+-------+------+-----------+-----------+--------------+----------+--------+-------+
only showing top 5 rows



In [None]:
df2 = spark.read.option("header", True).csv("policy.csv") \
      .withColumn("sum_insured", col("sum_insured").cast(IntegerType())) \
      .withColumn("vehicle_age", col("vehicle_age").cast(IntegerType())) \
      .withColumn("premium", col("premium").cast(IntegerType()))

df2.show(1)
df2.printSchema()

+-------+------+-----------+-----------+--------------+----------+--------+-------+
| policy|  make|vehicle_age|sum_insured|inception_date|start_date|end_date|premium|
+-------+------+-----------+-----------+--------------+----------+--------+-------+
|CAR0001|TOYOTA|          1|      15000|      20180101|  20180101|20181231|   1000|
+-------+------+-----------+-----------+--------------+----------+--------+-------+
only showing top 1 row

root
 |-- policy: string (nullable = true)
 |-- make: string (nullable = true)
 |-- vehicle_age: integer (nullable = true)
 |-- sum_insured: integer (nullable = true)
 |-- inception_date: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- premium: integer (nullable = true)



In [None]:
#flights.csv /mon/dom/dow/mile/duration

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType

df = spark.read.option("header", True).csv("flights.csv")
#df.show(1)
df.printSchema()

df2 = spark.read.option("header", True).csv("flights.csv") \
      .withColumn("mon", col("mon").cast(IntegerType())) \
      .withColumn("dom", col("dom").cast(IntegerType())) \
      .withColumn("dow", col("dow").cast(IntegerType())) \
      .withColumn("mile", col("mile").cast(IntegerType())) \
      .withColumn("duration", col("duration").cast(IntegerType()))

df2.show(1)
df2.printSchema()


root
 |-- mon: string (nullable = true)
 |-- dom: string (nullable = true)
 |-- dow: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- org: string (nullable = true)
 |-- mile: string (nullable = true)
 |-- depart: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- delay: string (nullable = true)

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351|   NA|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 1 row

root
 |-- mon: integer (nullable = true)
 |-- dom: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- org: string (nullable = true)
 |-- mile: integer (nullable = true)
 |-- depart: string (nullable = true)
 |

In [None]:
#data transformation

from pyspark.sql import SparkSession
from pyspark.sql.functions import col,when,lit,concat,to_date,rank
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("PySparkTutorial").getOrCreate()

policyDF = spark.read.option("header", True).csv("policy.csv") \
      .withColumn("sum_insured", col("sum_insured").cast(IntegerType())) \
      .withColumn("vehicle_age", col("vehicle_age").cast(IntegerType())) \
      .withColumn("premium", col("premium").cast(IntegerType()))

claimsDF = spark.read.option("header", True).csv("claims.csv") \
      .withColumn("cost", col("cost").cast(IntegerType()))

In [None]:
policyDF.printSchema()
claimsDF.printSchema()

root
 |-- policy: string (nullable = true)
 |-- make: string (nullable = true)
 |-- vehicle_age: integer (nullable = true)
 |-- sum_insured: integer (nullable = true)
 |-- inception_date: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- premium: integer (nullable = true)

root
 |-- policy: string (nullable = true)
 |-- incident_date: string (nullable = true)
 |-- cost: integer (nullable = true)



In [None]:
policyDF = policyDF.withColumn("status", when(policyDF.start_date == policyDF.inception_date,
                               "New Business").otherwise("Renewal"))

policyDF.show(3)

+-------+------+-----------+-----------+--------------+----------+--------+-------+------------+
| policy|  make|vehicle_age|sum_insured|inception_date|start_date|end_date|premium|      status|
+-------+------+-----------+-----------+--------------+----------+--------+-------+------------+
|CAR0001|TOYOTA|          1|      15000|      20180101|  20180101|20181231|   1000|New Business|
|CAR0001|TOYOTA|          2|      13500|      20180101|  20190101|20191231|    900|     Renewal|
|CAR0001|TOYOTA|          3|      12000|      20180101|  20200101|20201231|    800|     Renewal|
+-------+------+-----------+-----------+--------------+----------+--------+-------+------------+
only showing top 3 rows



In [None]:
def fix_dates(df):
  for column in df.columns:
    if column.endswith("_date") and dict(df.dtypes)[column] == 'string':
      print("NOTE:Fixing date column '{}'.".format(column))
      df = df.withColumn(column,to_date(df[column],"yyyyMMdd"))
  return df

policyDF = fix_dates(policyDF)
claimsDF = fix_dates(claimsDF)

NOTE:Fixing date column 'inception_date'.
NOTE:Fixing date column 'start_date'.
NOTE:Fixing date column 'end_date'.
NOTE:Fixing date column 'incident_date'.


In [None]:
#joining

combinedDF = policyDF.select(["policy","make","vehicle_age","sum_insured",
                              "start_date","end_date","premium"]) \
                              .join(claimsDF,(policyDF.policy == claimsDF.policy) &
                                    (policyDF.start_date <= claimsDF.incident_date) &
                                    (policyDF.end_date >= claimsDF.incident_date))
                              
combinedDF.show()

+-------+------+-----------+-----------+----------+----------+-------+-------+-------------+-----+
| policy|  make|vehicle_age|sum_insured|start_date|  end_date|premium| policy|incident_date| cost|
+-------+------+-----------+-----------+----------+----------+-------+-------+-------------+-----+
|CAR0001|TOYOTA|          3|      12000|2020-01-01|2020-12-31|    800|CAR0001|   2020-06-05| 5000|
|CAR0004| MAZDA|          5|      10000|2020-04-02|2021-04-01|    700|CAR0004|   2020-06-10| 3000|
|CAR0007|   BMW|          4|      24000|2020-07-13|2021-07-12|   1600|CAR0007|   2020-09-10|24000|
|CAR0009| TESLA|          1|      72000|2019-09-17|2020-09-16|   4800|CAR0009|   2020-02-10|15000|
+-------+------+-----------+-----------+----------+----------+-------+-------+-------------+-----+



In [None]:
combinedDF = policyDF.select(["policy","make","vehicle_age","sum_insured",
                              "start_date","end_date","premium"]) \
                              .join(claimsDF,(policyDF.policy == claimsDF.policy))
                              
combinedDF.show()

+-------+------+-----------+-----------+----------+----------+-------+-------+-------------+-----+
| policy|  make|vehicle_age|sum_insured|start_date|  end_date|premium| policy|incident_date| cost|
+-------+------+-----------+-----------+----------+----------+-------+-------+-------------+-----+
|CAR0001|TOYOTA|          1|      15000|2018-01-01|2018-12-31|   1000|CAR0001|   2020-06-05| 5000|
|CAR0001|TOYOTA|          2|      13500|2019-01-01|2019-12-31|    900|CAR0001|   2020-06-05| 5000|
|CAR0001|TOYOTA|          3|      12000|2020-01-01|2020-12-31|    800|CAR0001|   2020-06-05| 5000|
|CAR0004| MAZDA|          4|      11000|2019-04-02|2020-04-01|    750|CAR0004|   2020-06-10| 3000|
|CAR0004| MAZDA|          5|      10000|2020-04-02|2021-04-01|    700|CAR0004|   2020-06-10| 3000|
|CAR0007|   BMW|          4|      24000|2020-07-13|2021-07-12|   1600|CAR0007|   2020-09-10|24000|
|CAR0009| TESLA|          1|      72000|2019-09-17|2020-09-16|   4800|CAR0009|   2020-02-10|15000|
|CAR0009| 

In [None]:
#windowing functions

policyDF = policyDF.withColumn("policy_term",rank().over(Window.partitionBy("policy").orderBy("start_date")))\
.withColumn("total_premium",F.sum("premium").over(Window.partitionBy("policy")
.orderBy("start_date"))) \
.withColumn("policy_id",concat(policyDF.policy,lit("~"),col("policy_term")))

policyDF.select(["policy","start_date","policy_term","premium","total_premium","policy_id"]).show()

+-------+----------+-----------+-------+-------------+---------+
| policy|start_date|policy_term|premium|total_premium|policy_id|
+-------+----------+-----------+-------+-------------+---------+
|CAR0002|2020-02-10|          1|    950|          950|CAR0002~1|
|CAR0001|2018-01-01|          1|   1000|         1000|CAR0001~1|
|CAR0001|2019-01-01|          2|    900|         1900|CAR0001~2|
|CAR0001|2020-01-01|          3|    800|         2700|CAR0001~3|
|CAR0006|2020-06-18|          1|    300|          300|CAR0006~1|
|CAR0007|2020-07-13|          1|   1600|         1600|CAR0007~1|
|CAR0008|2020-08-11|          1|   1800|         1800|CAR0008~1|
|CAR0004|2019-04-02|          1|    750|          750|CAR0004~1|
|CAR0004|2020-04-02|          2|    700|         1450|CAR0004~2|
|CAR0009|2019-09-17|          1|   4800|         4800|CAR0009~1|
|CAR0009|2020-09-17|          2|   3500|         8300|CAR0009~2|
|CAR0010|2020-10-15|          1|    300|          300|CAR0010~1|
|CAR0005|2020-05-16|     

In [None]:
#summarization
summary = policyDF.groupBy("status").sum("sum_insured").withColumnRenamed("sum(sum_insured)","total_insured")
summary.show()

+------------+-------------+
|      status|total_insured|
+------------+-------------+
|     Renewal|       107500|
|New Business|       199000|
+------------+-------------+



In [None]:
policyDF.crosstab("status","make").show()

+------------+----+---+----+------+-------+-----+------+------+-----+------+
| status_make|AUDI|BMW|FORD|HOLDEN|HYUNDAI|MAZDA|SUBARU|SUZUKI|TESLA|TOYOTA|
+------------+----+---+----+------+-------+-----+------+------+-----+------+
|New Business|   1|  1|   1|     1|      1|    2|     1|     1|    1|     1|
|     Renewal|   0|  0|   2|     0|      0|    0|     0|     0|    1|     2|
+------------+----+---+----+------+-------+-----+------+------+-----+------+



In [None]:
#CSV

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import TimestampType

df = spark.read.option("header", True).csv("walmart_stock.csv")

#what are the column names?
print(df.columns)
print()

#what does the schema look like?
df = df \
      .withColumn("Date", col("Date").cast(TimestampType())) \
      .withColumn("Open", col("Open").cast(DoubleType())) \
      .withColumn("High", col("High").cast(DoubleType())) \
      .withColumn("Low", col("Low").cast(DoubleType())) \
      .withColumn("Close", col("Close").cast(DoubleType())) \
      .withColumn("Volume", col("Volume").cast(IntegerType())) \
      .withColumn("Adj Close", col("Adj Close").cast(DoubleType()))


df.printSchema()
print()

df.show(5)

print()
df.describe().show()

for row in df.head(5):
  print(row)
  print("\n")



['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)


+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00

In [None]:
#There are too many decimal places for mean and stddev in the describe() dataframe. Format the numbers to just show up to two decimal places.
from pyspark.sql.functions import format_number

summary = df.describe()
summary.select(summary['summary'],
                  format_number(summary['Open'].cast('float'), 2).alias('Open'),
                  format_number(summary['High'].cast('float'), 2).alias('High'),
                  format_number(summary['Low'].cast('float'), 2).alias('Low'),
                  format_number(summary['Close'].cast('float'), 2).alias('Close'),
                  format_number(summary['Volume'].cast('int'),0).alias('Volume')
                 ).show()

+-------+--------+--------+--------+--------+----------+
|summary|    Open|    High|     Low|   Close|    Volume|
+-------+--------+--------+--------+--------+----------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|     1,258|
|   mean|   72.36|   72.84|   71.92|   72.39| 8,222,093|
| stddev|    6.77|    6.77|    6.74|    6.76| 4,519,780|
|    min|   56.39|   57.06|   56.30|   56.42| 2,094,900|
|    max|   90.80|   90.97|   89.25|   90.47|80,898,100|
+-------+--------+--------+--------+--------+----------+



In [None]:
df_hv = df.withColumn('HV Ratio', df['High']/df['Volume']).select(['HV Ratio'])
df_hv.show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



In [None]:
df.orderBy(df['High'].desc()).select(['Date']).head(1)[0]['Date']

datetime.datetime(2015, 1, 13, 0, 0)

In [None]:
from pyspark.sql.functions import mean

df.select(mean('Close')).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



In [None]:
from pyspark.sql.functions import min, max
 
df.select(max('Volume'),min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



In [None]:
df.filter(df['Close'] < 60).count()

81

In [None]:
#What percentage of the time was the High greater than 80 dollars ?
#In other words, (Number of Days High>80)/(Total Days in the dataset)

df.filter('High > 80').count() * 100/df.count()

9.141494435612083

In [None]:
#What is the Pearson correlation between High and Volume?
df.corr('High', 'Volume')

-0.3384326061737161

In [None]:
#What is the max High per year?

from pyspark.sql.functions import year
 
year_df = df.withColumn('Year', year(df['Date']))
 
year_df.groupBy('Year').max()['Year', 'max(High)'].show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



In [None]:
#What is the average Close for each Calendar Month?
#In other words, across all the years, what is the average Close price for Jan,Feb, Mar, etc... Your result will have a value for each of these months.



#Create a new column Month from existing Date column
month_df = df.withColumn('Month', month(df['Date']))
 
#Group by month and take average of all other columns
month_df = month_df.groupBy('Month').mean()
 
#Sort by month
month_df = month_df.orderBy('Month')
 
#Display only month and avg(Close), the desired columns
month_df['Month', 'avg(Close)'].show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+



In [None]:
import glob



for file in sorted(glob.glob("./data/policy.parquet/*")):
  print(file)

In [None]:
#sql

df = spark.sql(""" SELECT policy, make, 
                    CASE
                    WHEN inception_date = start_date
                    THEN 'New Business'
                    ELSE 'Renewal'
                    END AS status
                    FROM parquet. ./data/policy.paraquet'
                    ORDER BY policy,start_date""")

df.show()

ERROR:root:An unexpected error occurred while tokenizing input
The following traceback may be corrupted or invalid
The error message is: ('EOF in multi-line string', (1, 46))



ParseException: ignored

In [None]:
policyDF.explain()

== Physical Plan ==
*(1) Project [policy#510, make#511, cast(vehicle_age#512 as int) AS vehicle_age#535, cast(sum_insured#513 as int) AS sum_insured#526, inception_date#514, start_date#515, end_date#516, cast(premium#517 as int) AS premium#544]
+- FileScan csv [policy#510,make#511,vehicle_age#512,sum_insured#513,inception_date#514,start_date#515,end_date#516,premium#517] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/policy.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<policy:string,make:string,vehicle_age:string,sum_insured:string,inception_date:string,star...




In [None]:
policyDF.explain(mode="formatted")

== Physical Plan ==
* Project (2)
+- Scan csv  (1)


(1) Scan csv 
Output [8]: [policy#510, make#511, vehicle_age#512, sum_insured#513, inception_date#514, start_date#515, end_date#516, premium#517]
Batched: false
Location: InMemoryFileIndex [file:/content/policy.csv]
ReadSchema: struct<policy:string,make:string,vehicle_age:string,sum_insured:string,inception_date:string,start_date:string,end_date:string,premium:string>

(2) Project [codegen id : 1]
Output [8]: [policy#510, make#511, cast(vehicle_age#512 as int) AS vehicle_age#535, cast(sum_insured#513 as int) AS sum_insured#526, inception_date#514, start_date#515, end_date#516, cast(premium#517 as int) AS premium#544]
Input [8]: [policy#510, make#511, vehicle_age#512, sum_insured#513, inception_date#514, start_date#515, end_date#516, premium#517]




In [None]:
#parallelize

spark = SparkSession.builder.appName("EX1").getOrCreate()

rdd = spark.sparkContext.parallelize([1,2,3,4,5])
rddCollect = rdd.collect()

print("Number of partitions: " + str(rdd.getNumPartitions()))
print("Actions:First element: " + str(rdd.first()))
print(rddCollect)

emptyRDD = spark.sparkContext.emptyRDD()
emptyRDD2 = rdd = spark.sparkContext.parallelize([])

print("" + str(emptyRDD2.isEmpty()))

Number of partitions: 2
Actions:First element: 1
[1, 2, 3, 4, 5]
True
