In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [3]:
df = spark.read.option('header','true').csv('Data1.csv', inferSchema=True)

In [4]:
df.show()

+----+-------+
| Age|   Name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Name: string (nullable = true)



In [6]:
df.columns

['Age', 'Name']

In [7]:
df.describe()

DataFrame[summary: string, Age: string, Name: string]

In [8]:
df.describe().show()

+-------+------------------+-------+
|summary|               Age|   Name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [9]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [10]:
data_sc = [StructField('Age', IntegerType(), True),
          StructField('Name', StringType(), True)]

In [11]:
final_st = StructType(fields=data_sc)

In [12]:
df = spark.read.option('header','true').csv('Data1.csv', schema=final_st)

In [13]:
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Name: string (nullable = true)



In [14]:
df['Age']

Column<'Age'>

In [15]:
df.select('Age').show()

+----+
| Age|
+----+
|null|
|  30|
|  19|
+----+



In [16]:
df.select(['Age','Name']).show()

+----+-------+
| Age|   Name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [17]:
df.withColumn('newage',df['Age']+2).show()

+----+-------+------+
| Age|   Name|newage|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    32|
|  19| Justin|    21|
+----+-------+------+



In [18]:
df.withColumnRenamed('Age', 'my_new_age').show()

+----------+-------+
|my_new_age|   Name|
+----------+-------+
|      null|Michael|
|        30|   Andy|
|        19| Justin|
+----------+-------+



In [19]:
df.createOrReplaceGlobalTempView('people')

In [20]:
result = spark.sql("SELECT * FROM global_temp.people")

In [21]:
result.show()

+----+-------+
| Age|   Name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [22]:
spark.sql("SELECT * FROM global_temp.people WHERE Age=30").show()

+---+----+
|Age|Name|
+---+----+
| 30|Andy|
+---+----+



### Spark Dataframe Basic Operation

In [23]:
df = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)

In [24]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: long (nullable = true)



In [25]:
df.show()

+----------+--------+--------+--------+--------+---------+---------+
|      Date|    Open|    High|     Low|   Close|Adj Close|   Volume|
+----------+--------+--------+--------+--------+---------+---------+
|1980-12-12|0.128348|0.128906|0.128348|0.128348| 0.100323|469033600|
|1980-12-15| 0.12221| 0.12221|0.121652|0.121652| 0.095089|175884800|
|1980-12-16|0.113281|0.113281|0.112723|0.112723|  0.08811|105728000|
|1980-12-17|0.115513|0.116071|0.115513|0.115513| 0.090291| 86441600|
|1980-12-18|0.118862| 0.11942|0.118862|0.118862| 0.092908| 73449600|
|1980-12-19|0.126116|0.126674|0.126116|0.126116| 0.098578| 48630400|
|1980-12-22|0.132254|0.132813|0.132254|0.132254| 0.103376| 37363200|
|1980-12-23|0.137835|0.138393|0.137835|0.137835| 0.107739| 46950400|
|1980-12-24|0.145089|0.145647|0.145089|0.145089| 0.113409| 48003200|
|1980-12-26|0.158482| 0.15904|0.158482|0.158482| 0.123877| 55574400|
|1980-12-29|0.160714|0.161272|0.160714|0.160714| 0.125622| 93161600|
|1980-12-30|0.157366|0.157366|0.15

In [26]:
df.filter('Close < 500').show()

+----------+--------+--------+--------+--------+---------+---------+
|      Date|    Open|    High|     Low|   Close|Adj Close|   Volume|
+----------+--------+--------+--------+--------+---------+---------+
|1980-12-12|0.128348|0.128906|0.128348|0.128348| 0.100323|469033600|
|1980-12-15| 0.12221| 0.12221|0.121652|0.121652| 0.095089|175884800|
|1980-12-16|0.113281|0.113281|0.112723|0.112723|  0.08811|105728000|
|1980-12-17|0.115513|0.116071|0.115513|0.115513| 0.090291| 86441600|
|1980-12-18|0.118862| 0.11942|0.118862|0.118862| 0.092908| 73449600|
|1980-12-19|0.126116|0.126674|0.126116|0.126116| 0.098578| 48630400|
|1980-12-22|0.132254|0.132813|0.132254|0.132254| 0.103376| 37363200|
|1980-12-23|0.137835|0.138393|0.137835|0.137835| 0.107739| 46950400|
|1980-12-24|0.145089|0.145647|0.145089|0.145089| 0.113409| 48003200|
|1980-12-26|0.158482| 0.15904|0.158482|0.158482| 0.123877| 55574400|
|1980-12-29|0.160714|0.161272|0.160714|0.160714| 0.125622| 93161600|
|1980-12-30|0.157366|0.157366|0.15

In [27]:
df.filter('Close < 500').select(['Open','Close']).show()

+--------+--------+
|    Open|   Close|
+--------+--------+
|0.128348|0.128348|
| 0.12221|0.121652|
|0.113281|0.112723|
|0.115513|0.115513|
|0.118862|0.118862|
|0.126116|0.126116|
|0.132254|0.132254|
|0.137835|0.137835|
|0.145089|0.145089|
|0.158482|0.158482|
|0.160714|0.160714|
|0.157366|0.156808|
|0.152902|0.152344|
|0.154018|0.154018|
|0.151228| 0.15067|
|0.144531|0.143973|
|0.138393|0.137835|
|0.135603|0.135045|
|0.142299|0.142299|
|0.142299|0.141183|
+--------+--------+
only showing top 20 rows



In [28]:
df.filter((df['Close'] < 500) & (df['Open'] > 0)).show()

+----------+--------+--------+--------+--------+---------+---------+
|      Date|    Open|    High|     Low|   Close|Adj Close|   Volume|
+----------+--------+--------+--------+--------+---------+---------+
|1980-12-12|0.128348|0.128906|0.128348|0.128348| 0.100323|469033600|
|1980-12-15| 0.12221| 0.12221|0.121652|0.121652| 0.095089|175884800|
|1980-12-16|0.113281|0.113281|0.112723|0.112723|  0.08811|105728000|
|1980-12-17|0.115513|0.116071|0.115513|0.115513| 0.090291| 86441600|
|1980-12-18|0.118862| 0.11942|0.118862|0.118862| 0.092908| 73449600|
|1980-12-19|0.126116|0.126674|0.126116|0.126116| 0.098578| 48630400|
|1980-12-22|0.132254|0.132813|0.132254|0.132254| 0.103376| 37363200|
|1980-12-23|0.137835|0.138393|0.137835|0.137835| 0.107739| 46950400|
|1980-12-24|0.145089|0.145647|0.145089|0.145089| 0.113409| 48003200|
|1980-12-26|0.158482| 0.15904|0.158482|0.158482| 0.123877| 55574400|
|1980-12-29|0.160714|0.161272|0.160714|0.160714| 0.125622| 93161600|
|1980-12-30|0.157366|0.157366|0.15

In [29]:
result = df.filter(df['Low'] == 0.128348).collect()
result

[Row(Date=datetime.date(1980, 12, 12), Open=0.128348, High=0.128906, Low=0.128348, Close=0.128348, Adj Close=0.100323, Volume=469033600),
 Row(Date=datetime.date(1981, 2, 6), Open=0.128348, High=0.128906, Low=0.128348, Close=0.128348, Adj Close=0.100323, Volume=13865600),
 Row(Date=datetime.date(1981, 4, 27), Open=0.128906, High=0.128906, Low=0.128348, Close=0.128348, Adj Close=0.100323, Volume=38528000),
 Row(Date=datetime.date(1982, 11, 8), Open=0.134487, High=0.135603, Low=0.128348, Close=0.128906, Adj Close=0.100759, Volume=119190400),
 Row(Date=datetime.date(1982, 11, 9), Open=0.128906, High=0.134487, Low=0.128348, Close=0.133371, Adj Close=0.104249, Volume=179782400),
 Row(Date=datetime.date(1982, 11, 24), Open=0.128906, High=0.136161, Low=0.128348, Close=0.131696, Adj Close=0.10294, Volume=73740800),
 Row(Date=datetime.date(1982, 11, 30), Open=0.128906, High=0.142857, Low=0.128348, Close=0.142299, Adj Close=0.111228, Volume=159196800),
 Row(Date=datetime.date(1983, 1, 11), Open=

In [30]:
row_1 = result[0]
row_1.asDict()

{'Date': datetime.date(1980, 12, 12),
 'Open': 0.128348,
 'High': 0.128906,
 'Low': 0.128348,
 'Close': 0.128348,
 'Adj Close': 0.100323,
 'Volume': 469033600}

### GroupBy and Aggregate Function

In [31]:
df = spark.read.csv('sales_data_sample.csv', inferSchema=True, header=True)

In [32]:
df.show()

+-------+-----------+--------------------+
|  Sales|    Company|              Person|
+-------+-----------+--------------------+
| 2871.0|Motorcycles|   Land of Toys Inc.|
| 2765.9|Motorcycles|  Reims Collectables|
|3884.34|Motorcycles|     Lyon Souveniers|
| 3746.7|Motorcycles|   Toys4GrownUps.com|
|5205.27|Motorcycles|Corporate Gift Id...|
|3479.76|Motorcycles|Technics Stores Inc.|
|2497.77|Motorcycles|Daedalus Designs ...|
|5512.32|Motorcycles|        Herkku Gifts|
|2168.54|Motorcycles|     Mini Wheels Co.|
|4708.44|Motorcycles|    Auto Canal Petit|
|3965.66|Motorcycles|Australian Collec...|
|2333.12|Motorcycles|     Vitachrome Inc.|
|3188.64|Motorcycles|Tekni Collectable...|
|3676.76|Motorcycles|     Gift Depot Inc.|
|4177.35|Motorcycles|   La Rochelle Gifts|
|4099.68|Motorcycles|Marta's Replicas Co.|
|2597.39|Motorcycles|Toys of Finland, Co.|
|4394.38|Motorcycles|  Baane Mini Imports|
|4358.04|Motorcycles|Diecast Classics ...|
|4396.14|Motorcycles|   Land of Toys Inc.|
+-------+--

In [33]:
df.printSchema()

root
 |-- Sales: double (nullable = true)
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)



In [34]:
df.groupBy('Company').mean().show()

+----------------+------------------+
|         Company|        avg(Sales)|
+----------------+------------------+
|     Motorcycles| 3523.831842900303|
|    Vintage Cars|3135.3391103789113|
|           Ships| 3053.150128205128|
|Trucks and Buses|3746.8100996677726|
|    Classic Cars|4053.3771044467394|
|          Trains|2938.2268831168826|
|          Planes|3186.2861764705885|
+----------------+------------------+



In [35]:
df.agg({'Sales':'Sum'}).show()

+-------------------+
|         sum(Sales)|
+-------------------+
|1.003262885000001E7|
+-------------------+



In [36]:
group_data = df.groupBy('Company')

In [37]:
group_data.agg({'Sales':'max'}).show()

+----------------+----------+
|         Company|max(Sales)|
+----------------+----------+
|     Motorcycles|   11886.6|
|    Vintage Cars|   14082.8|
|           Ships|   6960.48|
|Trucks and Buses|   8844.12|
|    Classic Cars|   12001.0|
|          Trains|   8977.05|
|          Planes|   10066.6|
+----------------+----------+



In [38]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [39]:
df.select(countDistinct('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                 2763|
+---------------------+



In [40]:
df.select(avg('Sales').alias('Average Sales')).show()

+----------------+
|   Average Sales|
+----------------+
|3553.88907190932|
+----------------+



In [41]:
df.select(stddev('Sales')).show()

+------------------+
|stddev_samp(Sales)|
+------------------+
|1841.8651057401842|
+------------------+



In [42]:
from pyspark.sql.functions import format_number

In [43]:
sales_std = df.select(stddev('Sales').alias('std'))
sales_std.select(format_number('std', 2).alias('std')).show()

+--------+
|     std|
+--------+
|1,841.87|
+--------+



In [44]:
df.orderBy('Sales').show()

+------+----------------+--------------------+
| Sales|         Company|              Person|
+------+----------------+--------------------+
|482.13|Trucks and Buses|   La Rochelle Gifts|
|541.14|    Vintage Cars|The Sharp Gifts W...|
|553.95|    Vintage Cars|Tokyo Collectable...|
| 577.6|    Vintage Cars|  Amica Models & Co.|
|640.05|    Classic Cars|Salzburg Collecta...|
| 651.8|     Motorcycles|   Gifts4AllAges.com|
|652.35|    Vintage Cars|Souveniers And Th...|
| 683.8|    Vintage Cars|Corrida Auto Repl...|
| 694.6|    Vintage Cars|  Auto Assoc. & Cie.|
| 703.6|    Vintage Cars|Marseille Mini Autos|
| 710.2|    Vintage Cars|      AV Stores, Co.|
| 717.4|    Classic Cars|Mini Gifts Distri...|
|721.44|    Classic Cars|La Corne D'abonda...|
| 728.4|Trucks and Buses|Euro Shopping Cha...|
|733.11|     Motorcycles|   La Rochelle Gifts|
|759.46|    Vintage Cars|Australian Collec...|
| 777.0|    Classic Cars|Euro Shopping Cha...|
|785.64|    Vintage Cars|    Handji Gifts& Co|
| 813.2|    C

In [45]:
df.orderBy(df['Sales'].desc()).show()

+-------+------------+--------------------+
|  Sales|     Company|              Person|
+-------+------------+--------------------+
|14082.8|Vintage Cars|The Sharp Gifts W...|
|12536.5|Vintage Cars|Online Diecast Cr...|
|12001.0|Classic Cars|Euro Shopping Cha...|
|11887.8|Classic Cars|Euro Shopping Cha...|
|11886.6| Motorcycles|UK Collectables, ...|
|11739.7|Classic Cars|         Mini Caravy|
|11623.7|Classic Cars|Mini Gifts Distri...|
|11336.7|Vintage Cars|     Mini Wheels Co.|
|11279.2|Classic Cars|  Muscle Machine Inc|
|10993.5|Classic Cars|Dragon Souveniers...|
|10758.0|Vintage Cars|Tokyo Collectable...|
|10606.2|Classic Cars| Suominen Souveniers|
|10468.9|Classic Cars|Danish Wholesale ...|
|10172.7|Classic Cars|  Auto Assoc. & Cie.|
|10066.6|      Planes|    FunGiftIdeas.com|
|10039.6|      Planes|   La Rochelle Gifts|
|9774.03| Motorcycles|Australian Collec...|
| 9720.0|Vintage Cars|     Lyon Souveniers|
|9661.44|Classic Cars|The Sharp Gifts W...|
| 9631.0|Classic Cars|Online Min

### Missing Data

In [46]:
df = spark.read.csv('Data_1.csv', inferSchema=True, header=True)
df.show()

+---+-------+-----+
| Id|   Name|Sales|
+---+-------+-----+
|  1|Michael| null|
|  2|   null| null|
|  3|   null|345.0|
|  4|  Cindy|456.1|
+---+-------+-----+



In [47]:
df.na.drop().show()

+---+-----+-----+
| Id| Name|Sales|
+---+-----+-----+
|  4|Cindy|456.1|
+---+-----+-----+



In [48]:
df.na.drop(thresh=2).show()

+---+-------+-----+
| Id|   Name|Sales|
+---+-------+-----+
|  1|Michael| null|
|  3|   null|345.0|
|  4|  Cindy|456.1|
+---+-------+-----+



In [49]:
df.na.drop(how='all').show()

+---+-------+-----+
| Id|   Name|Sales|
+---+-------+-----+
|  1|Michael| null|
|  2|   null| null|
|  3|   null|345.0|
|  4|  Cindy|456.1|
+---+-------+-----+



In [50]:
df.na.drop(subset=['Sales']).show()

+---+-----+-----+
| Id| Name|Sales|
+---+-----+-----+
|  3| null|345.0|
|  4|Cindy|456.1|
+---+-----+-----+



In [51]:
df.na.fill('Fill Values').show()

+---+-----------+-----+
| Id|       Name|Sales|
+---+-----------+-----+
|  1|    Michael| null|
|  2|Fill Values| null|
|  3|Fill Values|345.0|
|  4|      Cindy|456.1|
+---+-----------+-----+



In [52]:
df.na.fill('No Name', subset=['Name']).show()

+---+-------+-----+
| Id|   Name|Sales|
+---+-------+-----+
|  1|Michael| null|
|  2|No Name| null|
|  3|No Name|345.0|
|  4|  Cindy|456.1|
+---+-------+-----+



In [53]:
from pyspark.sql.functions import mean

In [54]:
mean_val = df.select(mean(df['Sales'])).collect()
print(mean_val)
print(mean_val[0])
print(mean_val[0][0])

[Row(avg(Sales)=400.55)]
Row(avg(Sales)=400.55)
400.55


In [55]:
df.na.fill(mean_val[0][0], subset=['Sales']).show()

+---+-------+------+
| Id|   Name| Sales|
+---+-------+------+
|  1|Michael|400.55|
|  2|   null|400.55|
|  3|   null| 345.0|
|  4|  Cindy| 456.1|
+---+-------+------+



In [56]:
df.na.fill(df.select(mean(df['Sales'])).collect()[0][0], ['Sales']).show()

+---+-------+------+
| Id|   Name| Sales|
+---+-------+------+
|  1|Michael|400.55|
|  2|   null|400.55|
|  3|   null| 345.0|
|  4|  Cindy| 456.1|
+---+-------+------+



### Date & Timestamps

In [57]:
df = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)

In [58]:
df.head(1)

[Row(Date=datetime.date(1980, 12, 12), Open=0.128348, High=0.128906, Low=0.128348, Close=0.128348, Adj Close=0.100323, Volume=469033600)]

In [59]:
from pyspark.sql.functions import dayofmonth, hour, dayofyear, month, year, weekofyear, format_number, date_format

In [60]:
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|              12|
|              15|
|              16|
|              17|
|              18|
|              19|
|              22|
|              23|
|              24|
|              26|
|              29|
|              30|
|              31|
|               2|
|               5|
|               6|
|               7|
|               8|
|               9|
|              12|
+----------------+
only showing top 20 rows



In [61]:
df.select(year(df["Date"])).show()

+----------+
|year(Date)|
+----------+
|      1980|
|      1980|
|      1980|
|      1980|
|      1980|
|      1980|
|      1980|
|      1980|
|      1980|
|      1980|
|      1980|
|      1980|
|      1980|
|      1981|
|      1981|
|      1981|
|      1981|
|      1981|
|      1981|
|      1981|
+----------+
only showing top 20 rows



In [62]:
newdf = df.withColumn('Year', year(df['Date']))

In [63]:
newdf.groupBy('Year').mean().show()

+----+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+---------+
|Year|          avg(Open)|          avg(High)|           avg(Low)|         avg(Close)|     avg(Adj Close)|         avg(Volume)|avg(Year)|
+----+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+---------+
|1990| 0.3348390830039528| 0.3412443596837945|0.32872493280632425|0.33537290513834017|0.26991642292490087|1.7550174229249012E8|   1990.0|
|2003| 0.3307461349206348|0.33649096428571457| 0.3251247341269843| 0.3311564642857142|0.28356080158730157| 2.826597333333333E8|   2003.0|
|2007|  4.585324422310758|  4.645374227091633|  4.506589378486057|  4.581210864541833| 3.9227735418326684| 9.840477513944223E8|   2007.0|
|2018|  47.27785866533861|  47.74852592828685| 46.795876565737025|  47.26335669721114|  45.51715897609561|1.3608025816733068E8|   2018.0|
|2015| 30.043938484126986|  30.311

In [64]:
result = newdf.groupBy('Year').mean().select(['Year', 'avg(Close)'])

In [65]:
new  = result.withColumnRenamed('avg(Close)', 'Average Closing Price')
new.show()

+----+---------------------+
|Year|Average Closing Price|
+----+---------------------+
|1990|  0.33537290513834017|
|2003|   0.3311564642857142|
|2007|    4.581210864541833|
|2018|    47.26335669721114|
|2015|   30.009999976190482|
|2006|   2.5289513107569728|
|2022|    167.4470177017544|
|2013|     16.8798171547619|
|1988|   0.3708842411067196|
|1997|   0.1604176679841897|
|1994|   0.3042905634920634|
|2014|    23.06616354365081|
|2019|    52.06398806349207|
|2004|   0.6344097261904763|
|1991|  0.46870147826086955|
|1982|  0.08545888142292486|
|1989|   0.3719529682539681|
|1996|  0.22249534645669278|
|1998|   0.2729004801587301|
|1985|  0.09023348809523808|
+----+---------------------+
only showing top 20 rows



In [66]:
new.select(['Year', format_number('Average Closing Price', 2).alias('Average Closing Price (.2)')]).show()

+----+--------------------------+
|Year|Average Closing Price (.2)|
+----+--------------------------+
|1990|                      0.34|
|2003|                      0.33|
|2007|                      4.58|
|2018|                     47.26|
|2015|                     30.01|
|2006|                      2.53|
|2022|                    167.45|
|2013|                     16.88|
|1988|                      0.37|
|1997|                      0.16|
|1994|                      0.30|
|2014|                     23.07|
|2019|                     52.06|
|2004|                      0.63|
|1991|                      0.47|
|1982|                      0.09|
|1989|                      0.37|
|1996|                      0.22|
|1998|                      0.27|
|1985|                      0.09|
+----+--------------------------+
only showing top 20 rows

