# Spark Practice

In [1]:
import os
os.getenv("Sympathizer")

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [3]:
spark=SparkSession.builder.master("local").appName("Symp").getOrCreate()
conf=SparkConf().setMaster("local").setAppName("TrialHandsOnSpark")
sc=SparkContext.getOrCreate()

In [4]:
spark

* Load the CSV file

In [5]:
filep="AJANTPHARM.csv"
ajpharmfile=spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(filep)

In [6]:
ajpharmfile

DataFrame[Date: string, Open: double, High: double, Low: double, Last: double, Close: double, Volume: int, Turnover: double]

In [7]:
type(ajpharmfile)

pyspark.sql.dataframe.DataFrame

In [42]:
ajpharmfile.count()

4925

In [43]:
ajpharmfile.dropna().count()

4925

* This will print the schema of the dataframe

In [15]:
ajpharmfile.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Last: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Turnover: double (nullable = true)



In [9]:
ajpharmfile.take(5)

[Row(Date='2000-06-05', Open=30.67, High=30.67, Low=22.67, Last=22.82, Close=23.49, Volume=112642, Turnover=28.5),
 Row(Date='2000-06-06', Open=24.0, High=25.33, Low=22.29, Last=25.33, Close=24.68, Volume=63248, Turnover=15.12),
 Row(Date='2000-06-07', Open=25.3, High=25.32, Low=22.93, Last=23.47, Close=23.35, Volume=42278, Turnover=9.94),
 Row(Date='2000-06-08', Open=23.73, High=24.0, Low=23.33, Last=23.73, Close=23.67, Volume=13650, Turnover=3.22),
 Row(Date='2000-06-09', Open=24.0, High=24.0, Low=23.33, Last=23.73, Close=23.65, Volume=11550, Turnover=2.73)]

In [11]:
ajpharmfile.head(5)

[Row(Date='2000-06-05', Open=30.67, High=30.67, Low=22.67, Last=22.82, Close=23.49, Volume=112642, Turnover=28.5),
 Row(Date='2000-06-06', Open=24.0, High=25.33, Low=22.29, Last=25.33, Close=24.68, Volume=63248, Turnover=15.12),
 Row(Date='2000-06-07', Open=25.3, High=25.32, Low=22.93, Last=23.47, Close=23.35, Volume=42278, Turnover=9.94),
 Row(Date='2000-06-08', Open=23.73, High=24.0, Low=23.33, Last=23.73, Close=23.67, Volume=13650, Turnover=3.22),
 Row(Date='2000-06-09', Open=24.0, High=24.0, Low=23.33, Last=23.73, Close=23.65, Volume=11550, Turnover=2.73)]

* Show method shows the record appropriatly well aligned result

In [14]:
ajpharmfile.show(5, truncate=True)

+----------+-----+-----+-----+-----+-----+------+--------+
|      Date| Open| High|  Low| Last|Close|Volume|Turnover|
+----------+-----+-----+-----+-----+-----+------+--------+
|2000-06-05|30.67|30.67|22.67|22.82|23.49|112642|    28.5|
|2000-06-06| 24.0|25.33|22.29|25.33|24.68| 63248|   15.12|
|2000-06-07| 25.3|25.32|22.93|23.47|23.35| 42278|    9.94|
|2000-06-08|23.73| 24.0|23.33|23.73|23.67| 13650|    3.22|
|2000-06-09| 24.0| 24.0|23.33|23.73|23.65| 11550|    2.73|
+----------+-----+-----+-----+-----+-----+------+--------+
only showing top 5 rows



In [49]:
ajpharmfile.filter(ajpharmfile['Volume']>100000).count()

2788

* Load the new CSV file

In [15]:
namemycsv2file="retail.csv"
mycsv2file=spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(namemycsv2file)

In [16]:
mycsv2file.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [80]:
type(mycsv2file)

pyspark.sql.dataframe.DataFrame

* Order the values by column Quanity in descending order

In [88]:
mycsv2file.orderBy(mycsv2file.Quantity.desc()).show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536437|    17021|NAMASTE SWAGAT IN...|     600|2010-12-01 12:12:00|     0.24|   13694.0|United Kingdom|
|   536477|    21137|BLACK RECORD COVE...|     480|2010-12-01 12:27:00|     3.39|   16210.0|United Kingdom|
|   536387|    22466|FAIRY TALE COTTAG...|     432|2010-12-01 09:58:00|     1.45|   16029.0|United Kingdom|
|   536387|    21731|RED TOADSTOOL LED...|     432|2010-12-01 09:58:00|     1.25|   16029.0|United Kingdom|
|   536584|   84029E|RED WOOLLY HOTTIE...|     384|2010-12-01 16:22:00|     2.95|   13777.0|United Kingdom|
|   536390|    20668|DISCO BALL CHRIST...|     288|2010-12-01 10:19:00|      0.1|   17511.0|United Kingdom|
|   536575|    22095|LADS ON

* Order the values by two columns Quanity & Unit Price both in descending order

In [90]:
mycsv2file.orderBy(mycsv2file.Quantity.desc(), mycsv2file.UnitPrice.desc()).show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536437|    17021|NAMASTE SWAGAT IN...|     600|2010-12-01 12:12:00|     0.24|   13694.0|United Kingdom|
|   536477|    21137|BLACK RECORD COVE...|     480|2010-12-01 12:27:00|     3.39|   16210.0|United Kingdom|
|   536387|    22466|FAIRY TALE COTTAG...|     432|2010-12-01 09:58:00|     1.45|   16029.0|United Kingdom|
|   536387|    21731|RED TOADSTOOL LED...|     432|2010-12-01 09:58:00|     1.25|   16029.0|United Kingdom|
|   536584|   84029E|RED WOOLLY HOTTIE...|     384|2010-12-01 16:22:00|     2.95|   13777.0|United Kingdom|
|   536390|    20668|DISCO BALL CHRIST...|     288|2010-12-01 10:19:00|      0.1|   17511.0|United Kingdom|
|   536575|    22095|LADS ON

* Group by and mean operation on aggregation example

In [53]:
mycsv2file.groupBy('CustomerID').agg({'Quantity': 'mean'}).show()

+----------+------------------+
|CustomerID|     avg(Quantity)|
+----------+------------------+
|   15311.0| 5.472222222222222|
|   16539.0|14.333333333333334|
|   15100.0|              32.0|
|   12583.0|             22.45|
|   15291.0|              56.0|
|   13767.0|18.571428571428573|
|   17760.0|18.333333333333332|
|   17905.0| 3.130434782608696|
|   17924.0|              30.0|
|   17420.0| 7.571428571428571|
|   16928.0|              15.5|
|   14496.0|              17.2|
|   13576.0| 9.818181818181818|
|   13408.0| 49.45454545454545|
|      null|2.4403508771929823|
|   13694.0|167.33333333333334|
|   17908.0|2.9827586206896552|
|   17572.0|               6.0|
|   16552.0|               7.8|
|   17377.0|               6.8|
+----------+------------------+
only showing top 20 rows



In [53]:
mycsv2file.groupBy('CustomerID').agg({'Quantity': 'mean'})

+----------+------------------+
|CustomerID|     avg(Quantity)|
+----------+------------------+
|   15311.0| 5.472222222222222|
|   16539.0|14.333333333333334|
|   15100.0|              32.0|
|   12583.0|             22.45|
|   15291.0|              56.0|
|   13767.0|18.571428571428573|
|   17760.0|18.333333333333332|
|   17905.0| 3.130434782608696|
|   17924.0|              30.0|
|   17420.0| 7.571428571428571|
|   16928.0|              15.5|
|   14496.0|              17.2|
|   13576.0| 9.818181818181818|
|   13408.0| 49.45454545454545|
|      null|2.4403508771929823|
|   13694.0|167.33333333333334|
|   17908.0|2.9827586206896552|
|   17572.0|               6.0|
|   16552.0|               7.8|
|   17377.0|               6.8|
+----------+------------------+
only showing top 20 rows



In [47]:
mycsv2file.fillna(-1).show(2)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 2 rows



In [46]:
mycsv2file.count()

3108

* Following shows the difference between count() and distinctCount()

In [36]:
mycsv2file.select('StockCode').count()

3108

* This will return the rows with distinct StockCode values

In [35]:
mycsv2file.select('StockCode').distinct().count()

1351

In [17]:
mycsv2file.show(5, truncate=True)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [44]:
mycsv2file.select('StockCode','Quantity','CustomerID').dropDuplicates().show()

+---------+--------+----------+
|StockCode|Quantity|CustomerID|
+---------+--------+----------+
|    84969|       6|   13047.0|
|   85049E|       2|   15862.0|
|    22771|      12|   16218.0|
|    22077|       2|   17920.0|
|   85184C|       1|   17920.0|
|    22565|       3|   12838.0|
|    22744|       2|   17968.0|
|    22731|       2|   17968.0|
|    21867|       1|   14729.0|
|    22781|       2|   15922.0|
|    21928|      20|   16456.0|
|    21662|       2|      null|
|    22145|       1|      null|
|    22767|      -2|   12472.0|
|    22470|       2|   17841.0|
|    22976|      24|   17760.0|
|    22765|       1|   16274.0|
|    21012|       6|      null|
|    21108|      19|      null|
|    21164|       1|      null|
+---------+--------+----------+
only showing top 20 rows



In [45]:
mycsv2file.select('StockCode','Quantity','CustomerID').dropDuplicates().count()

2909

In [34]:
ajpharmfile.select('Volume','Turnover').show(5)

+------+--------+
|Volume|Turnover|
+------+--------+
|112642|    28.5|
| 63248|   15.12|
| 42278|    9.94|
| 13650|    3.22|
| 11550|    2.73|
+------+--------+
only showing top 5 rows



In [38]:
ajpharmfile.select('Volume','Turnover').count()

4925

In [37]:
ajpharmfile.select('Volume','Turnover').distinct().count()

4909

In [27]:
ajpharmfile.select('Volume').count()

4925

In [40]:
mycsv2file.crosstab('Quantity','StockCode').show(3)

+------------------+-----+-----+-----+-----+-----+------+-------+------+------+-----+-----+------+-----+-----+-----+------+-----+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----

* Demonstration of adding new column by withColumn()

In [97]:
ajpharmfile.withColumn('UpFall', ajpharmfile.Open - ajpharmfile.Close).select('Open','Close','UpFall').show(5)

+-----+-----+-------------------+
| Open|Close|             UpFall|
+-----+-----+-------------------+
|30.67|23.49|  7.180000000000003|
| 24.0|24.68|-0.6799999999999997|
| 25.3|23.35| 1.9499999999999993|
|23.73|23.67|0.05999999999999872|
| 24.0|23.65| 0.3500000000000014|
+-----+-----+-------------------+
only showing top 5 rows

