In [42]:
import pyspark

In [43]:
from pyspark.sql import SparkSession

In [44]:
# creating spark session
spark=SparkSession.builder.appName('Practice').getOrCreate()

In [45]:
spark

In [46]:
# importing dataframe in csv
df_spark=spark.read.option('header','true').csv('AirPassengers.csv')
df_spark

DataFrame[Month: string, #Passengers: string]

In [47]:
df_spark.show()

+-------+-----------+
|  Month|#Passengers|
+-------+-----------+
|1949-01|        112|
|1949-02|        118|
|1949-03|        132|
|1949-04|        129|
|1949-05|        121|
|1949-06|        135|
|1949-07|        148|
|1949-08|        148|
|1949-09|        136|
|1949-10|        119|
|1949-11|        104|
|1949-12|        118|
|1950-01|        115|
|1950-02|        126|
|1950-03|        141|
|1950-04|        135|
|1950-05|        125|
|1950-06|        149|
|1950-07|        170|
|1950-08|        170|
+-------+-----------+
only showing top 20 rows



In [48]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [49]:
df_spark.head()

Row(Month='1949-01', #Passengers='112')

In [50]:
# like info in pandas
df_spark.printSchema()

root
 |-- Month: string (nullable = true)
 |-- #Passengers: string (nullable = true)



In [51]:
# by default considers all columns as string,we have to do inferSchema=True while reading to avoid that

In [52]:
# importing dataframe in csv
df_spark=spark.read.option('header','true').csv('AirPassengers.csv',inferSchema=True)
df_spark

DataFrame[Month: string, #Passengers: int]

In [53]:
# importing dataframe in csv
df_spark=spark.read.csv('AirPassengers.csv',header=True,inferSchema=True)
df_spark

DataFrame[Month: string, #Passengers: int]

In [54]:
df_spark.show()

+-------+-----------+
|  Month|#Passengers|
+-------+-----------+
|1949-01|        112|
|1949-02|        118|
|1949-03|        132|
|1949-04|        129|
|1949-05|        121|
|1949-06|        135|
|1949-07|        148|
|1949-08|        148|
|1949-09|        136|
|1949-10|        119|
|1949-11|        104|
|1949-12|        118|
|1950-01|        115|
|1950-02|        126|
|1950-03|        141|
|1950-04|        135|
|1950-05|        125|
|1950-06|        149|
|1950-07|        170|
|1950-08|        170|
+-------+-----------+
only showing top 20 rows



In [55]:
# dataframe is also a type of data structure

In [56]:
df_spark.columns

['Month', '#Passengers']

In [57]:
df_spark.head(10)

[Row(Month='1949-01', #Passengers=112),
 Row(Month='1949-02', #Passengers=118),
 Row(Month='1949-03', #Passengers=132),
 Row(Month='1949-04', #Passengers=129),
 Row(Month='1949-05', #Passengers=121),
 Row(Month='1949-06', #Passengers=135),
 Row(Month='1949-07', #Passengers=148),
 Row(Month='1949-08', #Passengers=148),
 Row(Month='1949-09', #Passengers=136),
 Row(Month='1949-10', #Passengers=119)]

In [77]:
# selecting a column

In [58]:
df_spark.select('Month')

DataFrame[Month: string]

In [59]:
df_spark.select('Month').show()

+-------+
|  Month|
+-------+
|1949-01|
|1949-02|
|1949-03|
|1949-04|
|1949-05|
|1949-06|
|1949-07|
|1949-08|
|1949-09|
|1949-10|
|1949-11|
|1949-12|
|1950-01|
|1950-02|
|1950-03|
|1950-04|
|1950-05|
|1950-06|
|1950-07|
|1950-08|
+-------+
only showing top 20 rows



In [60]:
type(df_spark.select('Month'))

pyspark.sql.dataframe.DataFrame

In [61]:
df_spark.select('Month','#Passengers').show()

+-------+-----------+
|  Month|#Passengers|
+-------+-----------+
|1949-01|        112|
|1949-02|        118|
|1949-03|        132|
|1949-04|        129|
|1949-05|        121|
|1949-06|        135|
|1949-07|        148|
|1949-08|        148|
|1949-09|        136|
|1949-10|        119|
|1949-11|        104|
|1949-12|        118|
|1950-01|        115|
|1950-02|        126|
|1950-03|        141|
|1950-04|        135|
|1950-05|        125|
|1950-06|        149|
|1950-07|        170|
|1950-08|        170|
+-------+-----------+
only showing top 20 rows



In [62]:
df_spark.select(['Month','#Passengers']).show()

+-------+-----------+
|  Month|#Passengers|
+-------+-----------+
|1949-01|        112|
|1949-02|        118|
|1949-03|        132|
|1949-04|        129|
|1949-05|        121|
|1949-06|        135|
|1949-07|        148|
|1949-08|        148|
|1949-09|        136|
|1949-10|        119|
|1949-11|        104|
|1949-12|        118|
|1950-01|        115|
|1950-02|        126|
|1950-03|        141|
|1950-04|        135|
|1950-05|        125|
|1950-06|        149|
|1950-07|        170|
|1950-08|        170|
+-------+-----------+
only showing top 20 rows



In [63]:
df_spark['Month']

Column<'Month'>

In [64]:
df_spark.dtypes

[('Month', 'string'), ('#Passengers', 'int')]

In [78]:
# describe a dataframe

In [65]:
df_spark.describe().show()

+-------+-------+------------------+
|summary|  Month|       #Passengers|
+-------+-------+------------------+
|  count|    144|               144|
|   mean|   null| 280.2986111111111|
| stddev|   null|119.96631694294322|
|    min|1949-01|               104|
|    max|1960-12|               622|
+-------+-------+------------------+



In [66]:
# adding columns in pyspark dataframe

In [67]:
df_spark=df_spark.withColumn('Passengers1',df_spark['#Passengers']+2)

In [71]:
df_spark.show()

+-------+-----------+-----------+
|  Month|#Passengers|Passengers1|
+-------+-----------+-----------+
|1949-01|        112|        114|
|1949-02|        118|        120|
|1949-03|        132|        134|
|1949-04|        129|        131|
|1949-05|        121|        123|
|1949-06|        135|        137|
|1949-07|        148|        150|
|1949-08|        148|        150|
|1949-09|        136|        138|
|1949-10|        119|        121|
|1949-11|        104|        106|
|1949-12|        118|        120|
|1950-01|        115|        117|
|1950-02|        126|        128|
|1950-03|        141|        143|
|1950-04|        135|        137|
|1950-05|        125|        127|
|1950-06|        149|        151|
|1950-07|        170|        172|
|1950-08|        170|        172|
+-------+-----------+-----------+
only showing top 20 rows



In [69]:
df_spark.count()

144

In [None]:
# drop columns

In [74]:
df_spark.drop('Passengers1').show()

+-------+-----------+
|  Month|#Passengers|
+-------+-----------+
|1949-01|        112|
|1949-02|        118|
|1949-03|        132|
|1949-04|        129|
|1949-05|        121|
|1949-06|        135|
|1949-07|        148|
|1949-08|        148|
|1949-09|        136|
|1949-10|        119|
|1949-11|        104|
|1949-12|        118|
|1950-01|        115|
|1950-02|        126|
|1950-03|        141|
|1950-04|        135|
|1950-05|        125|
|1950-06|        149|
|1950-07|        170|
|1950-08|        170|
+-------+-----------+
only showing top 20 rows



In [75]:
# renaming the columns

In [76]:
df_spark.withColumnRenamed('#Passengers','Passengers').show()

+-------+----------+-----------+
|  Month|Passengers|Passengers1|
+-------+----------+-----------+
|1949-01|       112|        114|
|1949-02|       118|        120|
|1949-03|       132|        134|
|1949-04|       129|        131|
|1949-05|       121|        123|
|1949-06|       135|        137|
|1949-07|       148|        150|
|1949-08|       148|        150|
|1949-09|       136|        138|
|1949-10|       119|        121|
|1949-11|       104|        106|
|1949-12|       118|        120|
|1950-01|       115|        117|
|1950-02|       126|        128|
|1950-03|       141|        143|
|1950-04|       135|        137|
|1950-05|       125|        127|
|1950-06|       149|        151|
|1950-07|       170|        172|
|1950-08|       170|        172|
+-------+----------+-----------+
only showing top 20 rows



# Handling missing values

In [114]:
# importing dataframe in csv
df_spark=spark.read.option('header','true').csv('AirPassengers.csv',inferSchema=True)
df_spark

DataFrame[Month: string, #Passengers: int]

In [115]:
df_spark.show()

+-------+-----------+
|  Month|#Passengers|
+-------+-----------+
|1949-01|        112|
|1949-02|        118|
|1949-03|        132|
|1949-04|       null|
|   null|       null|
|1949-06|        135|
|1949-07|        148|
|1949-08|        148|
|1949-09|        136|
|1949-10|        119|
|1949-11|        104|
|1949-12|        118|
|1950-01|        115|
|1950-02|        126|
|1950-03|        141|
|1950-04|        135|
|1950-05|        125|
|1950-06|        149|
|1950-07|        170|
|1950-08|        170|
+-------+-----------+
only showing top 20 rows



In [116]:
df_spark.na.drop().show()

+-------+-----------+
|  Month|#Passengers|
+-------+-----------+
|1949-01|        112|
|1949-02|        118|
|1949-03|        132|
|1949-06|        135|
|1949-07|        148|
|1949-08|        148|
|1949-09|        136|
|1949-10|        119|
|1949-11|        104|
|1949-12|        118|
|1950-01|        115|
|1950-02|        126|
|1950-03|        141|
|1950-04|        135|
|1950-05|        125|
|1950-06|        149|
|1950-07|        170|
|1950-08|        170|
|1950-09|        158|
|1950-10|        133|
+-------+-----------+
only showing top 20 rows



In [117]:
# any==how

In [118]:
df_spark.na.drop(how='all').show()

+-------+-----------+
|  Month|#Passengers|
+-------+-----------+
|1949-01|        112|
|1949-02|        118|
|1949-03|        132|
|1949-04|       null|
|1949-06|        135|
|1949-07|        148|
|1949-08|        148|
|1949-09|        136|
|1949-10|        119|
|1949-11|        104|
|1949-12|        118|
|1950-01|        115|
|1950-02|        126|
|1950-03|        141|
|1950-04|        135|
|1950-05|        125|
|1950-06|        149|
|1950-07|        170|
|1950-08|        170|
|1950-09|        158|
+-------+-----------+
only showing top 20 rows



In [119]:
# threshold: no of null values a row should have to drop

In [120]:
# subset: drop nan only from some particular columns

In [121]:
# filling the missing values

In [122]:
df_spark.na.fill('0').show()

+-------+-----------+
|  Month|#Passengers|
+-------+-----------+
|1949-01|        112|
|1949-02|        118|
|1949-03|        132|
|1949-04|       null|
|      0|       null|
|1949-06|        135|
|1949-07|        148|
|1949-08|        148|
|1949-09|        136|
|1949-10|        119|
|1949-11|        104|
|1949-12|        118|
|1950-01|        115|
|1950-02|        126|
|1950-03|        141|
|1950-04|        135|
|1950-05|        125|
|1950-06|        149|
|1950-07|        170|
|1950-08|        170|
+-------+-----------+
only showing top 20 rows



In [125]:
df_spark.na.fill(0,subset=['#Passengers']).show()

+-------+-----------+
|  Month|#Passengers|
+-------+-----------+
|1949-01|        112|
|1949-02|        118|
|1949-03|        132|
|1949-04|          0|
|   null|          0|
|1949-06|        135|
|1949-07|        148|
|1949-08|        148|
|1949-09|        136|
|1949-10|        119|
|1949-11|        104|
|1949-12|        118|
|1950-01|        115|
|1950-02|        126|
|1950-03|        141|
|1950-04|        135|
|1950-05|        125|
|1950-06|        149|
|1950-07|        170|
|1950-08|        170|
+-------+-----------+
only showing top 20 rows



In [126]:
# imputing missing values

In [127]:
from pyspark.ml.feature import Imputer

In [129]:
imputer=Imputer(inputCols=['#Passengers'],
               outputCols=['Imputed_#Passengers']).setStrategy('mean')

In [133]:
imputer.fit(df_spark).transform(df_spark).show()

+-------+-----------+-------------------+
|  Month|#Passengers|Imputed_#Passengers|
+-------+-----------+-------------------+
|1949-01|        112|                112|
|1949-02|        118|                118|
|1949-03|        132|                132|
|1949-04|       null|                283|
|   null|       null|                283|
|1949-06|        135|                135|
|1949-07|        148|                148|
|1949-08|        148|                148|
|1949-09|        136|                136|
|1949-10|        119|                119|
|1949-11|        104|                104|
|1949-12|        118|                118|
|1950-01|        115|                115|
|1950-02|        126|                126|
|1950-03|        141|                141|
|1950-04|        135|                135|
|1950-05|        125|                125|
|1950-06|        149|                149|
|1950-07|        170|                170|
|1950-08|        170|                170|
+-------+-----------+-------------

In [132]:
df_spark.show()

+-------+-----------+
|  Month|#Passengers|
+-------+-----------+
|1949-01|        112|
|1949-02|        118|
|1949-03|        132|
|1949-04|       null|
|   null|       null|
|1949-06|        135|
|1949-07|        148|
|1949-08|        148|
|1949-09|        136|
|1949-10|        119|
|1949-11|        104|
|1949-12|        118|
|1950-01|        115|
|1950-02|        126|
|1950-03|        141|
|1950-04|        135|
|1950-05|        125|
|1950-06|        149|
|1950-07|        170|
|1950-08|        170|
+-------+-----------+
only showing top 20 rows



# Filter Operations

In [165]:
# importing dataframe in csv
df_spark=spark.read.option('header','true').csv('AirPassengers.csv',inferSchema=True)
df_spark

DataFrame[Month: string, Passengers: int]

In [145]:
df_spark.show()

+-------+----------+
|  Month|Passengers|
+-------+----------+
|1949-01|       112|
|1949-02|       118|
|1949-03|       132|
|1949-04|      null|
|   null|      null|
|1949-06|       135|
|1949-07|       148|
|1949-08|       148|
|1949-09|       136|
|1949-10|       119|
|1949-11|       104|
|1949-12|       118|
|1950-01|       115|
|1950-02|       126|
|1950-03|       141|
|1950-04|       135|
|1950-05|       125|
|1950-06|       149|
|1950-07|       170|
|1950-08|       170|
+-------+----------+
only showing top 20 rows



In [146]:
df_spark=df_spark.na.fill('0')

In [147]:
df_spark=df_spark.na.fill(0)

In [148]:
df_spark.show()

+-------+----------+
|  Month|Passengers|
+-------+----------+
|1949-01|       112|
|1949-02|       118|
|1949-03|       132|
|1949-04|         0|
|      0|         0|
|1949-06|       135|
|1949-07|       148|
|1949-08|       148|
|1949-09|       136|
|1949-10|       119|
|1949-11|       104|
|1949-12|       118|
|1950-01|       115|
|1950-02|       126|
|1950-03|       141|
|1950-04|       135|
|1950-05|       125|
|1950-06|       149|
|1950-07|       170|
|1950-08|       170|
+-------+----------+
only showing top 20 rows



In [150]:
df_spark.filter("Passengers>=120").show()

+-------+----------+
|  Month|Passengers|
+-------+----------+
|1949-03|       132|
|1949-06|       135|
|1949-07|       148|
|1949-08|       148|
|1949-09|       136|
|1950-02|       126|
|1950-03|       141|
|1950-04|       135|
|1950-05|       125|
|1950-06|       149|
|1950-07|       170|
|1950-08|       170|
|1950-09|       158|
|1950-10|       133|
|1950-12|       140|
|1951-01|       145|
|1951-02|       150|
|1951-03|       178|
|1951-04|       163|
|1951-05|       172|
+-------+----------+
only showing top 20 rows



In [153]:
df_spark.filter(df_spark["Passengers"]>=120).show()

+-------+----------+
|  Month|Passengers|
+-------+----------+
|1949-03|       132|
|1949-06|       135|
|1949-07|       148|
|1949-08|       148|
|1949-09|       136|
|1950-02|       126|
|1950-03|       141|
|1950-04|       135|
|1950-05|       125|
|1950-06|       149|
|1950-07|       170|
|1950-08|       170|
|1950-09|       158|
|1950-10|       133|
|1950-12|       140|
|1951-01|       145|
|1951-02|       150|
|1951-03|       178|
|1951-04|       163|
|1951-05|       172|
+-------+----------+
only showing top 20 rows



In [154]:
df_spark.filter((df_spark["Passengers"]>=120) & (df_spark["Passengers"]<=150)).show()

+-------+----------+
|  Month|Passengers|
+-------+----------+
|1949-03|       132|
|1949-06|       135|
|1949-07|       148|
|1949-08|       148|
|1949-09|       136|
|1950-02|       126|
|1950-03|       141|
|1950-04|       135|
|1950-05|       125|
|1950-06|       149|
|1950-10|       133|
|1950-12|       140|
|1951-01|       145|
|1951-02|       150|
|1951-11|       146|
+-------+----------+



In [155]:
df_spark.filter(~(df_spark["Passengers"]>=120)).show()

+-------+----------+
|  Month|Passengers|
+-------+----------+
|1949-01|       112|
|1949-02|       118|
|1949-04|         0|
|      0|         0|
|1949-10|       119|
|1949-11|       104|
|1949-12|       118|
|1950-01|       115|
|1950-11|       114|
|1953-01|         0|
|1953-06|         0|
+-------+----------+



# Groupby

In [177]:
from pyspark.sql.functions import to_date,month

In [188]:
# importing dataframe in csv
df_spark=spark.read.option('header','true').csv('AirPassengers.csv',inferSchema=True)
df_spark

DataFrame[Month: string, Passengers: int]

In [192]:
df_spark=df_spark.select(to_date(df_spark.Month, 'yyyy-MM').alias('Date'),'Passengers')

In [193]:
df_spark.show()

+----------+----------+
|      Date|Passengers|
+----------+----------+
|1949-01-01|       112|
|1949-02-01|       118|
|1949-03-01|       132|
|1949-04-01|      null|
|1949-04-01|      null|
|1949-06-01|       135|
|1949-07-01|       148|
|1949-08-01|       148|
|1949-09-01|       136|
|1949-10-01|       119|
|1949-11-01|       104|
|1949-12-01|       118|
|1950-01-01|       115|
|1950-02-01|       126|
|1950-03-01|       141|
|1950-04-01|       135|
|1950-05-01|       125|
|1950-06-01|       149|
|1950-07-01|       170|
|1950-08-01|       170|
+----------+----------+
only showing top 20 rows



In [194]:
df_spark

DataFrame[Date: date, Passengers: int]

In [195]:
df_spark=df_spark.select('Date','Passengers',month(df_spark.Date).alias('Month_Number'))
df_spark

DataFrame[Date: date, Passengers: int, Month_Number: int]

In [196]:
df_spark.show()

+----------+----------+------------+
|      Date|Passengers|Month_Number|
+----------+----------+------------+
|1949-01-01|       112|           1|
|1949-02-01|       118|           2|
|1949-03-01|       132|           3|
|1949-04-01|      null|           4|
|1949-04-01|      null|           4|
|1949-06-01|       135|           6|
|1949-07-01|       148|           7|
|1949-08-01|       148|           8|
|1949-09-01|       136|           9|
|1949-10-01|       119|          10|
|1949-11-01|       104|          11|
|1949-12-01|       118|          12|
|1950-01-01|       115|           1|
|1950-02-01|       126|           2|
|1950-03-01|       141|           3|
|1950-04-01|       135|           4|
|1950-05-01|       125|           5|
|1950-06-01|       149|           6|
|1950-07-01|       170|           7|
|1950-08-01|       170|           8|
+----------+----------+------------+
only showing top 20 rows



In [201]:
df_spark.groupBy('Month_Number').sum()

DataFrame[Month_Number: int, sum(Passengers): bigint, sum(Month_Number): bigint]

In [202]:
df_spark.groupBy('Month_Number').sum().show()

+------------+---------------+-----------------+
|Month_Number|sum(Passengers)|sum(Month_Number)|
+------------+---------------+-----------------+
|          12|           3142|              144|
|           1|           2705|               12|
|           6|           3497|               72|
|           3|           3242|               36|
|           5|           3141|               55|
|           9|           3629|              108|
|           4|           3076|               52|
|           8|           4213|               96|
|           7|           4216|               84|
|          10|           3199|              120|
|          11|           2794|              132|
|           2|           2820|               24|
+------------+---------------+-----------------+



In [203]:
df_spark.groupBy('Month_Number').mean().show()

+------------+------------------+-----------------+
|Month_Number|   avg(Passengers)|avg(Month_Number)|
+------------+------------------+-----------------+
|          12| 261.8333333333333|             12.0|
|           1| 245.9090909090909|              1.0|
|           6|317.90909090909093|              6.0|
|           3| 270.1666666666667|              3.0|
|           5|285.54545454545456|              5.0|
|           9| 302.4166666666667|              9.0|
|           4| 279.6363636363636|              4.0|
|           8| 351.0833333333333|              8.0|
|           7| 351.3333333333333|              7.0|
|          10| 266.5833333333333|             10.0|
|          11|232.83333333333334|             11.0|
|           2|             235.0|              2.0|
+------------+------------------+-----------------+



In [204]:
df_spark.groupBy('Month_Number').count().show()

+------------+-----+
|Month_Number|count|
+------------+-----+
|          12|   12|
|           1|   12|
|           6|   12|
|           3|   12|
|           5|   11|
|           9|   12|
|           4|   13|
|           8|   12|
|           7|   12|
|          10|   12|
|          11|   12|
|           2|   12|
+------------+-----+



In [210]:
df_spark.agg({'Passengers':'sum'}).show()

+---------------+
|sum(Passengers)|
+---------------+
|          39674|
+---------------+

