# Exercise on pyspark dataframes

In [2]:
from pyspark.sql import SparkSession
from pathlib import Path
data_dir = Path('/home/juvid/Python-and-Spark-for-Big-Data-master/Spark_DataFrame_Project_Exercise/')

In [3]:
spark = SparkSession.builder.appName('ex').getOrCreate()

21/07/28 09:36:39 WARN Utils: Your hostname, GBLON1WLZ13699 resolves to a loopback address: 127.0.1.1; using 10.164.16.79 instead (on interface eth2)
21/07/28 09:36:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/07/28 09:36:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
df = spark.read.csv(str(data_dir/'walmart_stock.csv'), header=True, inferSchema=True)

In [5]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [6]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



This isn't the right schema, let's try and set it

In [9]:
from pyspark.sql.types import StructType, StructField, DateType, DoubleType, IntegerType

In [8]:
data_schema = [StructField('Date', DateType(), True),  
               StructField('Open', DoubleType(), True),
               StructField('High', DoubleType(), True),
               StructField('Low', DoubleType(), True),
               StructField('Close', DoubleType(), True),
               StructField('Volume', IntegerType(), True),
               StructField('Adj Close', DoubleType(), True),
              ]  

In [10]:
df = spark.read.csv(str(data_dir/'walmart_stock.csv'), schema = StructType(fields=data_schema))

In [11]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



Print first 5 rows

In [12]:
df.head(5)

[Row(Date=None, Open=None, High=None, Low=None, Close=None, Volume=None, Adj Close=None),
 Row(Date=datetime.date(2012, 1, 3), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996),
 Row(Date=datetime.date(2012, 1, 4), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475),
 Row(Date=datetime.date(2012, 1, 5), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539),
 Row(Date=datetime.date(2012, 1, 6), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922)]

Describe the dataframe

In [15]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

Format to 2 dp

In [16]:
description = df.describe()

In [21]:
description.printSchema()

root
 |-- summary: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [19]:
from pyspark.sql.functions import format_number

In [30]:
description.select(description['summary'],
                   format_number(description['Open'].cast('float'), 2).alias('Open'),
                   format_number(description['High'].cast('float'), 2).alias('High'),
                   format_number(description['Close'].cast('float'), 2).alias('Close'),
                   format_number(description['Volume'].cast('float'), 2).alias('Volume'),
                   format_number(description['Adj Close'].cast('float'), 2).alias('Adj Close'),
                  ).show()

+-------+--------+--------+--------+-------------+---------+
|summary|    Open|    High|   Close|       Volume|Adj Close|
+-------+--------+--------+--------+-------------+---------+
|  count|1,258.00|1,258.00|1,258.00|     1,258.00| 1,258.00|
|   mean|   72.36|   72.84|   72.39| 8,222,093.50|    67.24|
| stddev|    6.77|    6.77|    6.76| 4,519,781.00|     6.72|
|    min|   56.39|   57.06|   56.42| 2,094,900.00|    50.36|
|    max|   90.80|   90.97|   90.47|80,898,096.00|    84.91|
+-------+--------+--------+--------+-------------+---------+



Create new datafrane with a new column HV ratio = High price / volume per day.

In [37]:
df_with_hv = df.withColumn('HV Ratio', df['High']/df['Volume'])

In [38]:
df_with_hv.select('HV ratio').show()

+--------------------+
|            HV ratio|
+--------------------+
|                null|
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
+--------------------+
only showing top 20 rows



Find the day with the largest high price

In [61]:
df.orderBy('High', ascending=False).head(1)[0]['Date']

datetime.date(2015, 1, 13)

Find the mean close price

In [62]:
df.agg({'Close': 'avg'}).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



Find max and min volume

In [63]:
from pyspark.sql.functions import max as py_max
from pyspark.sql.functions import min as py_min

In [64]:
df.select(py_max('Volume'), py_min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



Number of days close was lower than 60

In [68]:
df.filter(df['Close'] < 60).count()

81

Percentage of time High was greater than 80 dollars

In [74]:
df.filter(df['High'] > 80).count()/df.count()*100

9.134233518665608

Correlation between High and Volume

In [75]:
from pyspark.ml.stat import Correlation

In [89]:
df = df.na.drop()  # the following method is sensitive to NaNs

In [90]:
df.stat.corr('High', 'Volume')

-0.3384326061737161

Max high per year

In [92]:
from pyspark.sql.functions import year

In [97]:
tmp = df.withColumn('Year', year('Date'))
tmp.groupby('Year').agg({'High': 'max'}).sort('Year').show()



+----+---------+
|Year|max(High)|
+----+---------+
|2012|77.599998|
|2013|81.370003|
|2014|88.089996|
|2015|90.970001|
|2016|75.190002|
+----+---------+



                                                                                

Average close for each calendar month

In [98]:
from pyspark.sql.functions import month

In [101]:
tmp = df.withColumn('Month', month('Date'))
tmp.groupby('Month').agg({'Close': 'avg'}).sort('Month').show()



+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+



                                                                                