In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("cs544").getOrCreate()
#single, local deployment

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/23 05:02:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
sc = spark.sparkContext 
#to see whats happening on the jobs go to localhost:4040 in browser

In [4]:
#gonna turn numbers into an RDD
nums = list(range(0,1000000))

In [5]:
rdd=sc.parallelize(nums)

In [6]:
type(rdd)

pyspark.rdd.RDD

In [8]:
#Lazy execution demo
#inverses = rdd.map(FUNCTIONNAME
inverses = rdd.map(lambda x:1/x) #could also define a function and put it in
#there should be a bug; div by 0. but bc lazy, no error yet
#map is a "transformation" and lazy

In [11]:
#inverses.mean() #mean is an action - so div by 0 will make error
#big ugly error

In [12]:
#fix this with a filter
#inverses = rdd.filter(FUNCTION).map(lambda x:1/x)
inverses = rdd.filter(lambda x: x!=0).map(lambda x:1/x)

In [13]:
inverses.mean()

23/03/23 05:07:00 WARN TaskSetManager: Stage 1 contains a task of very large size (2329 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

1.4392740115605892e-05

In [14]:
inverses.take(10) #get first 10

23/03/23 05:07:57 WARN TaskSetManager: Stage 2 contains a task of very large size (2329 KiB). The maximum recommended task size is 1000 KiB.


[1.0,
 0.5,
 0.3333333333333333,
 0.25,
 0.2,
 0.16666666666666666,
 0.14285714285714285,
 0.125,
 0.1111111111111111,
 0.1]

In [15]:
#^ still has warning about large parition size. so lets increase it

In [16]:
rdd.getNumPartitions()

2

In [19]:
rdd=sc.parallelize(nums, 10) #10 partitions
inverses = rdd.filter(lambda x: x!=0).map(lambda x:1/x)
inverses.mean()

                                                                                

1.4392740115605814e-05

In [20]:
# Caching
sample = rdd.sample(withReplacement=True, fraction=0.1, seed = 544) #can sample with replacement

In [22]:
import time

In [23]:
t0 =  time.time()
print(sample.mean())
t1= time.time()
print(t1-t0)



498504.761576394
2.2731735706329346


                                                                                

In [24]:
#asking it to cache
sample = rdd.sample(withReplacement=True, fraction=0.1, seed = 544).cache()
t0 =  time.time()
print(sample.mean())
t1= time.time()
print(t1-t0)
#may be slower bc extra work. but if i run it again its faster



498504.761576394
3.3323020935058594


                                                                                

In [25]:
t0 =  time.time()
print(sample.mean())
t1= time.time()
print(t1-t0)
#faster!



498504.761576394
1.4339439868927002


                                                                                

In [27]:
sample.getNumPartitions()
#orig had 10 parts for rdd otehrwise it was too large
#sample is 10% of rdd in size but has same num paritions
#that can be inefficient

10

In [28]:
sample = rdd.sample(withReplacement=True, fraction=0.1, seed = 544).repartition(1).cache()
t0 =  time.time()
print(sample.mean())
t1= time.time()
print(t1-t0)

[Stage 9:>                                                          (0 + 1) / 1]

498504.76157639606
3.0592968463897705


                                                                                

In [29]:
t0 =  time.time()
print(sample.mean())
t1= time.time()
print(t1-t0)
#faster!
#when doing this work, measure the impact of partitions. 

498504.76157639606
0.5813136100769043


                                                                                

In [32]:
# Spark dataframes

In [30]:
! wget https://pages.cs.wisc.edu/~harter/cs639/data/ghcnd-stations.txt

--2023-03-23 05:16:16--  https://pages.cs.wisc.edu/~harter/cs639/data/ghcnd-stations.txt
Resolving pages.cs.wisc.edu (pages.cs.wisc.edu)... 128.105.7.9
Connecting to pages.cs.wisc.edu (pages.cs.wisc.edu)|128.105.7.9|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10607756 (10M) [text/plain]
Saving to: ‘ghcnd-stations.txt’


2023-03-23 05:16:18 (16.7 MB/s) - ‘ghcnd-stations.txt’ saved [10607756/10607756]



In [31]:
df = spark.read.text("ghcnd-stations.txt") #read as text file. can do other formats

In [33]:
df.dtypes

[('value', 'string')]

In [35]:
df #not showing bc doesnt have values in memory rn bc lazy?

DataFrame[value: string]

In [36]:
type(df), type(df.rdd)

(pyspark.sql.dataframe.DataFrame, pyspark.rdd.RDD)

In [37]:
df.rdd.take(10) #take first 10 rows. take is an action

                                                                                

[Row(value='ACW00011604  17.1167  -61.7833   10.1    ST JOHNS COOLIDGE FLD                       '),
 Row(value='ACW00011647  17.1333  -61.7833   19.2    ST JOHNS                                    '),
 Row(value='AE000041196  25.3330   55.5170   34.0    SHARJAH INTER. AIRP            GSN     41196'),
 Row(value='AEM00041194  25.2550   55.3640   10.4    DUBAI INTL                             41194'),
 Row(value='AEM00041217  24.4330   54.6510   26.8    ABU DHABI INTL                         41217'),
 Row(value='AEM00041218  24.2620   55.6090  264.9    AL AIN INTL                            41218'),
 Row(value='AF000040930  35.3170   69.0170 3366.0    NORTH-SALANG                   GSN     40930'),
 Row(value='AFM00040938  34.2100   62.2280  977.2    HERAT                                  40938'),
 Row(value='AFM00040948  34.5660   69.2120 1791.3    KABUL INTL                             40948'),
 Row(value='AFM00040990  31.5000   65.8500 1010.0    KANDAHAR AIRPORT                      

In [38]:
#say i want to convert it to pandas
pandas_df = df.limit(10).toPandas() #limit is a transformation, topandas is action
pandas_df

Unnamed: 0,value
0,ACW00011604 17.1167 -61.7833 10.1 ST JO...
1,ACW00011647 17.1333 -61.7833 19.2 ST JO...
2,AE000041196 25.3330 55.5170 34.0 SHARJ...
3,AEM00041194 25.2550 55.3640 10.4 DUBAI...
4,AEM00041217 24.4330 54.6510 26.8 ABU D...
5,AEM00041218 24.2620 55.6090 264.9 AL AI...
6,AF000040930 35.3170 69.0170 3366.0 NORTH...
7,AFM00040938 34.2100 62.2280 977.2 HERAT...
8,AFM00040948 34.5660 69.2120 1791.3 KABUL...
9,AFM00040990 31.5000 65.8500 1010.0 KANDA...


In [39]:
pandas_df['station'] = pandas_df['value'].str[:11] #mutable - can change and add col

In [41]:
pandas_df

Unnamed: 0,value,station
0,ACW00011604 17.1167 -61.7833 10.1 ST JO...,ACW00011604
1,ACW00011647 17.1333 -61.7833 19.2 ST JO...,ACW00011647
2,AE000041196 25.3330 55.5170 34.0 SHARJ...,AE000041196
3,AEM00041194 25.2550 55.3640 10.4 DUBAI...,AEM00041194
4,AEM00041217 24.4330 54.6510 26.8 ABU D...,AEM00041217
5,AEM00041218 24.2620 55.6090 264.9 AL AI...,AEM00041218
6,AF000040930 35.3170 69.0170 3366.0 NORTH...,AF000040930
7,AFM00040938 34.2100 62.2280 977.2 HERAT...,AFM00040938
8,AFM00040948 34.5660 69.2120 1791.3 KABUL...,AFM00040948
9,AFM00040990 31.5000 65.8500 1010.0 KANDA...,AFM00040990


In [42]:
#how do it in df?
#rdds and spark DFs are immutable , so we create a new one to add a new col
from pyspark.sql.functions import expr
df2 = df.withColumn("station", expr("substring(value,0,11)")) #expression to calculate value

In [43]:
df2

DataFrame[value: string, station: string]

In [45]:
df2.limit(10).toPandas()

Unnamed: 0,value,station
0,ACW00011604 17.1167 -61.7833 10.1 ST JO...,ACW00011604
1,ACW00011647 17.1333 -61.7833 19.2 ST JO...,ACW00011647
2,AE000041196 25.3330 55.5170 34.0 SHARJ...,AE000041196
3,AEM00041194 25.2550 55.3640 10.4 DUBAI...,AEM00041194
4,AEM00041217 24.4330 54.6510 26.8 ABU D...,AEM00041217
5,AEM00041218 24.2620 55.6090 264.9 AL AI...,AEM00041218
6,AF000040930 35.3170 69.0170 3366.0 NORTH...,AF000040930
7,AFM00040938 34.2100 62.2280 977.2 HERAT...,AFM00040938
8,AFM00040948 34.5660 69.2120 1791.3 KABUL...,AFM00040948
9,AFM00040990 31.5000 65.8500 1010.0 KANDA...,AFM00040990
