In [1]:
import warnings
warnings.filterwarnings("ignore")

In [2]:
import findspark
findspark.init("/home/rajdeep/spark-3.5.0-bin-hadoop3")

In [3]:
from pyspark.sql import SparkSession

import warnings
warnings.filterwarnings("ignore")

In [4]:
spark = SparkSession.builder.appName("basics").getOrCreate()

23/11/20 17:14:13 WARN Utils: Your hostname, DESKTOP-CSFBOLK resolves to a loopback address: 127.0.1.1; using 172.19.12.103 instead (on interface eth0)
23/11/20 17:14:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/20 17:14:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
df = spark.read.json("data/people.json")

                                                                                

In [6]:
df.show(5)

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [7]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



<font size="4">Specifying the datatype for each column so as to avoid any type of error similar to scala
Using StructType in production ensures that the data is correctly interpreted and structured</font>

In [8]:
from pyspark.sql.types import StructField,StructType,IntegerType,StringType

In [9]:
schema = StructType([StructField('age',IntegerType(),True),StructField('name',StringType(),True)])

In [10]:
df = spark.read.json('data/people.json',schema=schema)

In [11]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [12]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



<font size="4">Spark basic operation</font>

In [13]:
df = spark.read.csv('data/appl_stock.csv', inferSchema=True, header=True )

In [14]:
df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [15]:
#filter rows which have closing greater than 500

In [16]:
df.filter(df['Close']>500).show()

+----------+------------------+------------------+------------------+------------------+---------+-----------------+
|      Date|              Open|              High|               Low|             Close|   Volume|        Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+-----------------+
|2012-02-13|        499.529991|503.83000899999996|497.08998899999995|502.60002099999997|129304000|        65.116633|
|2012-02-14|        504.659988|         509.56002|        502.000008|        509.459991|115099600|        66.005408|
|2012-02-16|        491.500008|        504.890007|         486.62999|502.20999900000004|236138000|        65.066102|
|2012-02-17|        503.109993|507.77002000000005|        500.299995|         502.12001|133951300|        65.054443|
|2012-02-21|506.88001299999996|        514.850021|504.12000300000005|        514.850021|151398800|        66.703738|
|2012-02-22|        513.079994|        515.489983|509.0700230000

In [17]:
#filter row which have close less than 200 but open greater than 200

In [18]:
df.filter((df['Open']>200) & (df['Close']<200)).show()

+----------+------------------+----------+----------+----------+---------+------------------+
|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+----------+------------------+----------+----------+----------+---------+------------------+



<font size="4">Spark groupby and agg operation</font>

In [19]:
df = spark.read.csv('data/sales_info.csv',inferSchema=True, header=True)

In [20]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [21]:
df.groupBy(df['Company']).sum().show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [22]:
df.agg({'Sales':'max'}).show()

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



In [23]:
from pyspark.sql.functions import stddev,count_distinct,avg, format_number

In [24]:
df.select(count_distinct(df['Sales']).alias('Total_Distinct_Sales')).show()

+--------------------+
|Total_Distinct_Sales|
+--------------------+
|                  11|
+--------------------+



In [25]:
df.select(format_number(stddev(df['Sales']),4).alias('stddev_sales')).show()

+------------+
|stddev_sales|
+------------+
|    250.0874|
+------------+



In [26]:
df.select(format_number(avg(df['Sales']),2).alias('avg_sales')).show()

+---------+
|avg_sales|
+---------+
|   360.58|
+---------+



In [27]:
df.orderBy(df['Sales']).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [28]:
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



<font size="4">Handle Missing Data</font>

In [29]:
#there are 2 ways of handling missing data such as dropping rows or filling it with some value

In [30]:
#dropping NA data

In [31]:
df = spark.read.csv('data/ContainsNull.csv',inferSchema=True, header=True)

In [32]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [33]:
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [34]:
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [35]:
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [36]:
df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [37]:
#filling the NA values

In [38]:
df.na.fill("Fill Values").show()

+----+-----------+-----+
|  Id|       Name|Sales|
+----+-----------+-----+
|emp1|       John| NULL|
|emp2|Fill Values| NULL|
|emp3|Fill Values|345.0|
|emp4|      Cindy|456.0|
+----+-----------+-----+



In [39]:
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| NULL|  0.0|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [40]:
df.na.fill('No Name').show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| NULL|
|emp2|No Name| NULL|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [41]:
from pyspark.sql.functions import mean

In [42]:
mean_sales = df.select(mean(df['Sales'])).collect()

In [43]:
mean_sales = mean_sales[0][0]

In [44]:
df.na.fill(mean_sales,['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| NULL|400.5|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



<font size=4>Dates and Timestamp</font>

In [45]:
df = spark.read.csv('data/appl_stock.csv', inferSchema=True, header=True)

In [46]:
df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [47]:
from pyspark.sql.functions import dayofmonth,dayofyear, weekofyear,hour, month,year, col

In [48]:
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [49]:
df.select(dayofyear(df['Date'])).show()

+---------------+
|dayofyear(Date)|
+---------------+
|              4|
|              5|
|              6|
|              7|
|              8|
|             11|
|             12|
|             13|
|             14|
|             15|
|             19|
|             20|
|             21|
|             22|
|             25|
|             26|
|             27|
|             28|
|             29|
|             32|
+---------------+
only showing top 20 rows



In [50]:
new_df = df.withColumn('Year',year(df['Date']))

In [54]:
new_df.groupBy('year').mean().select([col('year').alias('Year'),col('avg(Close)').alias('Avg. Closing Price')]).show()

+----+------------------+
|Year|Avg. Closing Price|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+

