# Tutorial-1: Introduction to PySpark

## Import PySpark

In [1]:
import pyspark

## Import and initialize SparkContext

In [2]:
from pyspark import SparkContext, SparkConf                                       
from pyspark.sql import SparkSession

## First option to initialize SparkContext ##
# conf = SparkConf().setAppName("problem6").setMaster("local[*]") 
# sc = SparkContext(conf=conf) 

## Second option to initailzie Spark Context ##
spark = SparkSession.builder.master("local[*]").appName("problem1").getOrCreate()
sc = spark.sparkContext

## Create Parallelized Collections

In [3]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

type(distData)

pyspark.rdd.RDD

## TODO: Read external data file
### Use SparkContext's textFile function to read in a text file

In [4]:
textFilePath = './emails.txt'
print([e for e in open(textFilePath)])

emails = sc.textFile(textFilePath)
type(emails)

['zhu@icloud.com\n', 'zhu@live.com\n', 'anna@sbcglobal.net\n', 'andrew@yahoo.ca\n', 'andrew@gmail.com\n', 'teddy@comcast.net\n', 'teddy@live.com\n', 'james@yahoo.ca\n', 'james@msn.com\n', 'leslie@optonline.net\n', 'jaarnial@outlook.com\n', 'leslie@hotmail.com\n', 'teddy@yahoo.ca\n', 'jeffcovey@outlook.com\n', 'gator@live.com\n', 'conteb@verizon.net\n', 'aegreene@me.com\n', 'nichoj@icloud.com\n', 'andrew@hotmail.com\n', 'kyle@att.net\n', 'teddy@comcast.net\n', 'kyle@yahoo.ca\n', 'gator@msn.com\n', 'albert@yahoo.com\n', 'nichoj@verizon.net\n', 'albert@yahoo.com\n', 'albert@mac.com\n', 'andrew@sbcglobal.net\n', 'leslie@optonline.net\n', 'kyle@msn.com\n', 'anna@aol.com\n', 'anna@msn.com\n', 'adamk@yahoo.ca\n']


pyspark.rdd.RDD

## Generate a list of data, from 1 to 10

In [5]:
data = list(range(1,11))
print(data)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


## Parallelize the data with 2 partitions

In [6]:
numbers = sc.parallelize(data,2)

## Get only even numbers, and collect them

In [7]:
numbers.filter(lambda x: x % 2 == 0).collect()

[2, 4, 6, 8, 10]

## TODO: find emails with hotmail domain

In [8]:
emails.filter(lambda e: '@hotmail' in e).collect()

['leslie@hotmail.com', 'andrew@hotmail.com']

## Square all the numbers in the list using the map operation

In [9]:
numbers.map(lambda x: x*2).collect()

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

## Use flatMap to apply a function that returns a list and flatten the result

In [10]:
m = numbers.map(lambda x: [x**2, x**3]).collect()

fm = numbers.flatMap(lambda x: [x**2, x**3]).collect()

print(m)
print(fm)

[[1, 1], [4, 8], [9, 27], [16, 64], [25, 125], [36, 216], [49, 343], [64, 512], [81, 729], [100, 1000]]
[1, 1, 4, 8, 9, 27, 16, 64, 25, 125, 36, 216, 49, 343, 64, 512, 81, 729, 100, 1000]


## TODO: seperate username and domain from all emails

### eg: marshuang80@gmail.com  ->  [marshuang80, gmail.com]

In [11]:
# Hint use the pyhton split() function
username_domain = emails.map(lambda x: x.split('@'))
username_domain.collect()

[['zhu', 'icloud.com'],
 ['zhu', 'live.com'],
 ['anna', 'sbcglobal.net'],
 ['andrew', 'yahoo.ca'],
 ['andrew', 'gmail.com'],
 ['teddy', 'comcast.net'],
 ['teddy', 'live.com'],
 ['james', 'yahoo.ca'],
 ['james', 'msn.com'],
 ['leslie', 'optonline.net'],
 ['jaarnial', 'outlook.com'],
 ['leslie', 'hotmail.com'],
 ['teddy', 'yahoo.ca'],
 ['jeffcovey', 'outlook.com'],
 ['gator', 'live.com'],
 ['conteb', 'verizon.net'],
 ['aegreene', 'me.com'],
 ['nichoj', 'icloud.com'],
 ['andrew', 'hotmail.com'],
 ['kyle', 'att.net'],
 ['teddy', 'comcast.net'],
 ['kyle', 'yahoo.ca'],
 ['gator', 'msn.com'],
 ['albert', 'yahoo.com'],
 ['nichoj', 'verizon.net'],
 ['albert', 'yahoo.com'],
 ['albert', 'mac.com'],
 ['andrew', 'sbcglobal.net'],
 ['leslie', 'optonline.net'],
 ['kyle', 'msn.com'],
 ['anna', 'aol.com'],
 ['anna', 'msn.com'],
 ['adamk', 'yahoo.ca']]

## Reduce by key

In [12]:
data = ["a", "b", "a", "a", "b", "b", "b", "b"]
rdd = sc.parallelize(data)

pairRDD = rdd.map(lambda x: (x, 1))

pairRDD.reduceByKey(lambda x,y: x+y).collect()

[('a', 3), ('b', 5)]

## TODO: count the number of domains with the same username

In [13]:
# do another mapping operation to make all domains in a list
username_domain = username_domain.map(lambda x: (x[0],1))
print("** Results from mapping values to list")
print(username_domain.top(3))

# joing lists with '+', e,g: [a,b] + [c,d,e] = [a,b,c,d,e]
print("\n** Results from reduceByKey ** ")
username_domain.reduceByKey(lambda val1, val2: val1 + val2).collect()

** Results from mapping values to list
[('zhu', 1), ('zhu', 1), ('teddy', 1)]

** Results from reduceByKey ** 


[('anna', 3),
 ('andrew', 4),
 ('james', 2),
 ('leslie', 3),
 ('jeffcovey', 1),
 ('gator', 2),
 ('aegreene', 1),
 ('nichoj', 2),
 ('adamk', 1),
 ('zhu', 2),
 ('teddy', 4),
 ('jaarnial', 1),
 ('conteb', 1),
 ('kyle', 3),
 ('albert', 3)]

# PySpark Dataframes

## Read in a csv file

In [14]:
df = spark.read.csv("./cars.csv", header = True)

df.printSchema()

root
 |-- YEAR: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- (kW): string (nullable = true)



## Show samples from dataframe

In [15]:
df.show(10)

+----+----------+--------------------+----------+----+
|YEAR|      Make|               Model|      Size|(kW)|
+----+----------+--------------------+----------+----+
|2012|MITSUBISHI|              i-MiEV|SUBCOMPACT|  49|
|2012|    NISSAN|                LEAF|  MID-SIZE|  80|
|2013|      FORD|      FOCUS ELECTRIC|   COMPACT| 107|
|2013|MITSUBISHI|              i-MiEV|SUBCOMPACT|  49|
|2013|    NISSAN|                LEAF|  MID-SIZE|  80|
|2013|     SMART|FORTWO ELECTRIC D...|TWO-SEATER|  35|
|2013|     SMART|FORTWO ELECTRIC D...|TWO-SEATER|  35|
|2013|     TESLA|MODEL S (40 kWh b...| FULL-SIZE| 270|
|2013|     TESLA|MODEL S (60 kWh b...| FULL-SIZE| 270|
|2013|     TESLA|MODEL S (85 kWh b...| FULL-SIZE| 270|
+----+----------+--------------------+----------+----+
only showing top 10 rows



## Filter all cars made in 2015

In [16]:
df_2015 = df.filter(df['YEAR'] == 2015).show(10)

+----+----------+--------------------+--------------------+----+
|YEAR|      Make|               Model|                Size|(kW)|
+----+----------+--------------------+--------------------+----+
|2015|       BMW|                  i3|          SUBCOMPACT| 125|
|2015| CHEVROLET|            SPARK EV|          SUBCOMPACT| 104|
|2015|      FORD|      FOCUS ELECTRIC|             COMPACT| 107|
|2015|       KIA|             SOUL EV|STATION WAGON - S...|  81|
|2015|MITSUBISHI|              i-MiEV|          SUBCOMPACT|  49|
|2015|    NISSAN|                LEAF|            MID-SIZE|  80|
|2015|     SMART|FORTWO ELECTRIC D...|          TWO-SEATER|  35|
|2015|     SMART|FORTWO ELECTRIC D...|          TWO-SEATER|  35|
|2015|     TESLA|MODEL S (60 kWh b...|           FULL-SIZE| 283|
|2015|     TESLA|MODEL S (70 kWh b...|           FULL-SIZE| 283|
+----+----------+--------------------+--------------------+----+
only showing top 10 rows



## Select columns Make, Model and Size

In [17]:
df.select(df['Make'], df['Model'], df['Size']).show(10)

+----------+--------------------+----------+
|      Make|               Model|      Size|
+----------+--------------------+----------+
|MITSUBISHI|              i-MiEV|SUBCOMPACT|
|    NISSAN|                LEAF|  MID-SIZE|
|      FORD|      FOCUS ELECTRIC|   COMPACT|
|MITSUBISHI|              i-MiEV|SUBCOMPACT|
|    NISSAN|                LEAF|  MID-SIZE|
|     SMART|FORTWO ELECTRIC D...|TWO-SEATER|
|     SMART|FORTWO ELECTRIC D...|TWO-SEATER|
|     TESLA|MODEL S (40 kWh b...| FULL-SIZE|
|     TESLA|MODEL S (60 kWh b...| FULL-SIZE|
|     TESLA|MODEL S (85 kWh b...| FULL-SIZE|
+----------+--------------------+----------+
only showing top 10 rows



## Count manufacturer based on number of cars made

In [18]:
df_manufacturer = df.groupBy("Make").count()

df_manufacturer.show()

+----------+-----+
|      Make|count|
+----------+-----+
|    NISSAN|    6|
|      FORD|    4|
| CHEVROLET|    3|
|     TESLA|   23|
|       BMW|    2|
|       KIA|    2|
|     SMART|    8|
|MITSUBISHI|    5|
+----------+-----+



## Sort manufacturer based on count of cars made

In [19]:
df_manufacturer.sort("count", ascending=False).show()

+----------+-----+
|      Make|count|
+----------+-----+
|     TESLA|   23|
|     SMART|    8|
|    NISSAN|    6|
|MITSUBISHI|    5|
|      FORD|    4|
| CHEVROLET|    3|
|       KIA|    2|
|       BMW|    2|
+----------+-----+



## Convert Spark DataFrame to Pandas DataFrame

In [20]:
df_pd = df.toPandas()

df_pd.head(10)

Unnamed: 0,YEAR,Make,Model,Size,(kW)
0,2012,MITSUBISHI,i-MiEV,SUBCOMPACT,49
1,2012,NISSAN,LEAF,MID-SIZE,80
2,2013,FORD,FOCUS ELECTRIC,COMPACT,107
3,2013,MITSUBISHI,i-MiEV,SUBCOMPACT,49
4,2013,NISSAN,LEAF,MID-SIZE,80
5,2013,SMART,FORTWO ELECTRIC DRIVE CABRIOLET,TWO-SEATER,35
6,2013,SMART,FORTWO ELECTRIC DRIVE COUPE,TWO-SEATER,35
7,2013,TESLA,MODEL S (40 kWh battery),FULL-SIZE,270
8,2013,TESLA,MODEL S (60 kWh battery),FULL-SIZE,270
9,2013,TESLA,MODEL S (85 kWh battery),FULL-SIZE,270


In [21]:
df_pd.describe()

Unnamed: 0,YEAR,Make,Model,Size,(kW)
count,53,53,53,53,53
unique,5,8,23,7,15
top,2016,TESLA,i-MiEV,FULL-SIZE,35
freq,19,23,5,21,8


In [22]:
sc.stop()