In [1]:
import pandas as pd

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

sc = pyspark.SparkContext('local[*]')
spark_session = SparkSession.builder.getOrCreate()

In [2]:
spark_session

In [3]:
spark_session.catalog.listTables()

[]

In [4]:
pop19_df = pd.read_csv('population_2019.csv')
gdp19_df = pd.DataFrame({'country':['China', 'India', 'US', 'Indonesia', 'Pakistan',
                                    'Brazil', 'Nigeria', 'Bangladesh', 'Russia'],
                         'continent':['Asia', 'Asia', 'North America', 'Asia', 'Asia',
                                    'South America', 'Africa', 'Asia', 'Europe']},
                        columns=['country', 'continent'])

In [5]:
pop19_df.head(3)

Unnamed: 0,Rank,name,pop2019(k),pop2018,GrowthRate,area,Density
0,1,China,1433783.686,,1.0039,9706961.0,147.7068
1,2,India,1366417.754,,1.0099,3287590.0,415.629
2,3,US,329064.917,,1.0059,9372610.0,35.1092


In [6]:
gdp19_df.head(3)

Unnamed: 0,country,continent
0,China,Asia
1,India,Asia
2,US,North America


In [7]:
spark_temp_pop19 = spark_session.createDataFrame(pop19_df)
spark_temp_pop19.createOrReplaceTempView('pop19')

spark_temp_gdp19 = spark_session.createDataFrame(gdp19_df)
spark_temp_gdp19.createOrReplaceTempView('gdp19')

In [8]:
spark_session.catalog.listTables()

[Table(name='gdp19', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='pop19', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [9]:
pop19_sp = spark_session.sql('SELECT * FROM pop19')
gdp19_sp = spark_session.sql('SELECT * FROM gdp19')

## pyspark vs. pandas

### Check dataframe size

In [10]:
%time
# pyspark
print('pop19_sp row number: %d' % pop19_sp.count())
print('pop19_sp size: (%d, %d)' % (pop19_sp.count(), len(pop19_sp.columns)))

CPU times: user 10 µs, sys: 2 µs, total: 12 µs
Wall time: 37.4 µs
pop19_sp row number: 237
pop19_sp size: (237, 7)


In [11]:
%time
# pandas
print('pop19_df row number: ' + str(len(pop19_df)))
print('pop19_df size: ' + str(pop19_df.shape))

CPU times: user 6 µs, sys: 0 ns, total: 6 µs
Wall time: 23.8 µs
pop19_df row number: 237
pop19_df size: (237, 7)


### Check unique values of a column

In [12]:
%time
# pyspark
pop19_sp.select('name').distinct().show(pop19_sp.count())

CPU times: user 5 µs, sys: 1 µs, total: 6 µs
Wall time: 22.9 µs
+--------------------+
|                name|
+--------------------+
|                Chad|
|              Russia|
|            Paraguay|
|            Anguilla|
|       North Ireland|
|               Yemen|
|             Senegal|
|              Sweden|
|             Tokelau|
|            Kiribati|
|              Guyana|
|         Philippines|
|             Eritrea|
|            Djibouti|
|               Tonga|
|            Malaysia|
|           Singapore|
|                Fiji|
|              Turkey|
|United States Vir...|
|              Malawi|
|      Western Sahara|
|                Iraq|
|             Germany|
|Northern Mariana ...|
|             Comoros|
|         Afghanistan|
|            Cambodia|
|              Rwanda|
|              Jordan|
|            Maldives|
|               Sudan|
|               Palau|
|              France|
|Turks and Caicos ...|
|              Greece|
|           Sri Lanka|
|          Monts

In [13]:
%time
# pandas
pop19_df['name'].unique()

CPU times: user 8 µs, sys: 3 µs, total: 11 µs
Wall time: 6.64 ms


array(['China', 'India', 'US', 'Indonesia', 'Pakistan', 'Brazil',
       'Nigeria', 'Bangladesh', 'Russia', 'Mexico', 'Japan', 'Ethiopia',
       'Philippines', 'Egypt', 'Vietnam',
       'Democratic Republic of the Congo', 'Germany', 'Turkey', 'Iran',
       'Thailand', 'UK', 'France', 'Italy', 'South Africa', 'Tanzania',
       'Myanmar', 'Kenya', 'South Korea', 'Colombia', 'Spain',
       'Argentina', 'Uganda', 'Ukraine', 'Algeria', 'Sudan', 'Iraq',
       'Afghanistan', 'Poland', 'Canada', 'Morocco', 'Saudi Arabia',
       'Uzbekistan', 'Peru', 'Malaysia', 'Angola', 'Ghana', 'Mozambique',
       'Yemen', 'Nepal', 'Venezuela', 'Madagascar', 'Cameroon',
       "Cote d'Ivoire", 'North Korea', 'Australia', 'Taiwan', 'Niger',
       'Sri Lanka', 'Burkina Faso', 'Mali', 'Romania', 'Chile', 'Malawi',
       'Kazakhstan', 'Zambia', 'Guatemala', 'Ecuador', 'Netherlands',
       'Syria', 'Cambodia', 'Senegal', 'Chad', 'Somalia', 'Zimbabwe',
       'Guinea', 'Rwanda', 'Benin', 'Tunisia', 'Bel

### Creating a new column

In [14]:
%time
# pyspark
pop19_sp = pop19_sp.withColumn('pop2018', pop19_sp['pop2019(k)'] / pop19_sp['GrowthRate'])

CPU times: user 204 µs, sys: 58 µs, total: 262 µs
Wall time: 295 µs


In [15]:
%time
# pandas
pop19_df.loc[:, 'pop2018'] = pop19_df['pop2019(k)'] / pop19_df['GrowthRate']

CPU times: user 8 µs, sys: 2 µs, total: 10 µs
Wall time: 587 µs


### Filtering

In [16]:
%time
# pyspark
pop19_sp.filter(pop19_sp['Rank'] < 10).show()

CPU times: user 4 µs, sys: 1e+03 ns, total: 5 µs
Wall time: 23.8 µs
+----+----------+-----------+------------------+----------+-----------+---------+
|Rank|      name| pop2019(k)|           pop2018|GrowthRate|       area|  Density|
+----+----------+-----------+------------------+----------+-----------+---------+
|   1|     China|1433783.686|1428213.6527542584|    1.0039|  9706961.0| 147.7068|
|   2|     India|1366417.754|1353022.8280027725|    1.0099|  3287590.0|  415.629|
|   3|        US| 329064.917|327134.82155283826|    1.0059|  9372610.0|  35.1092|
|   4| Indonesia| 270625.568|  267760.530325517|    1.0107|  1904569.0| 142.0928|
|   5|  Pakistan| 216565.318|212318.93921568626|      1.02|   881912.0| 245.5634|
|   6|    Brazil| 211049.527|209540.83300238283|    1.0072|  8515767.0|  24.7834|
|   7|   Nigeria| 200963.599|195909.14310781826|    1.0258|   923768.0| 217.5477|
|   8|Bangladesh| 163046.161| 161415.8608058608|    1.0101|   147570.0|1104.8734|
|   9|    Russia| 145872.256|1

In [17]:
%time
# pandas
pop19_df[pop19_df['Rank'] < 10]

CPU times: user 10 µs, sys: 0 ns, total: 10 µs
Wall time: 28.4 µs


Unnamed: 0,Rank,name,pop2019(k),pop2018,GrowthRate,area,Density
0,1,China,1433783.686,1428214.0,1.0039,9706961.0,147.7068
1,2,India,1366417.754,1353023.0,1.0099,3287590.0,415.629
2,3,US,329064.917,327134.8,1.0059,9372610.0,35.1092
3,4,Indonesia,270625.568,267760.5,1.0107,1904569.0,142.0928
4,5,Pakistan,216565.318,212318.9,1.02,881912.0,245.5634
5,6,Brazil,211049.527,209540.8,1.0072,8515767.0,24.7834
6,7,Nigeria,200963.599,195909.1,1.0258,923768.0,217.5477
7,8,Bangladesh,163046.161,161415.9,1.0101,147570.0,1104.8734
8,9,Russia,145872.256,145813.9,1.0004,17098242.0,8.5314


### Selecting a list of columns

In [18]:
%time
# pyspark
pop19_sp.select('Rank', 'name', 'Density').show(5)

CPU times: user 0 ns, sys: 252 µs, total: 252 µs
Wall time: 1.48 ms
+----+---------+--------+
|Rank|     name| Density|
+----+---------+--------+
|   1|    China|147.7068|
|   2|    India| 415.629|
|   3|       US| 35.1092|
|   4|Indonesia|142.0928|
|   5| Pakistan|245.5634|
+----+---------+--------+
only showing top 5 rows



In [19]:
%time
# pandas
pop19_df[['Rank', 'name', 'Density']].head()

CPU times: user 12 µs, sys: 3 µs, total: 15 µs
Wall time: 34.1 µs


Unnamed: 0,Rank,name,Density
0,1,China,147.7068
1,2,India,415.629
2,3,US,35.1092
3,4,Indonesia,142.0928
4,5,Pakistan,245.5634


### Aggregating

In [20]:
%time
# pyspark
pop19_sp.agg(F.sum('pop2019(k)')).collect()[0][0]

CPU times: user 10 µs, sys: 0 ns, total: 10 µs
Wall time: 31.7 µs


7715333.1000000015

In [21]:
%time
# pandas
pop19_df['pop2019(k)'].sum()

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 23.6 µs


7715333.1

### Rename column

In [22]:
%time
# pyspark
pop19_sp = pop19_sp.withColumnRenamed('name', 'country')
pop19_sp.show(5)

CPU times: user 0 ns, sys: 8 µs, total: 8 µs
Wall time: 125 µs
+----+---------+-----------+------------------+----------+---------+--------+
|Rank|  country| pop2019(k)|           pop2018|GrowthRate|     area| Density|
+----+---------+-----------+------------------+----------+---------+--------+
|   1|    China|1433783.686|1428213.6527542584|    1.0039|9706961.0|147.7068|
|   2|    India|1366417.754|1353022.8280027725|    1.0099|3287590.0| 415.629|
|   3|       US| 329064.917|327134.82155283826|    1.0059|9372610.0| 35.1092|
|   4|Indonesia| 270625.568|  267760.530325517|    1.0107|1904569.0|142.0928|
|   5| Pakistan| 216565.318|212318.93921568626|      1.02| 881912.0|245.5634|
+----+---------+-----------+------------------+----------+---------+--------+
only showing top 5 rows



In [23]:
%time
# pandas
pop19_df = pop19_df.rename(columns={'name':'country'})
pop19_df.head()

CPU times: user 6 µs, sys: 1e+03 ns, total: 7 µs
Wall time: 27.7 µs


Unnamed: 0,Rank,country,pop2019(k),pop2018,GrowthRate,area,Density
0,1,China,1433783.686,1428214.0,1.0039,9706961.0,147.7068
1,2,India,1366417.754,1353023.0,1.0099,3287590.0,415.629
2,3,US,329064.917,327134.8,1.0059,9372610.0,35.1092
3,4,Indonesia,270625.568,267760.5,1.0107,1904569.0,142.0928
4,5,Pakistan,216565.318,212318.9,1.02,881912.0,245.5634


### Joining

In [24]:
%time
# pyspark
merged_sp = pop19_sp.join(gdp19_sp, on='country')
merged_sp.show()

CPU times: user 6 µs, sys: 1e+03 ns, total: 7 µs
Wall time: 31.9 µs
+----------+----+-----------+------------------+----------+-----------+---------+-------------+
|   country|Rank| pop2019(k)|           pop2018|GrowthRate|       area|  Density|    continent|
+----------+----+-----------+------------------+----------+-----------+---------+-------------+
|    Russia|   9| 145872.256|145813.93042782886|    1.0004|1.7098242E7|   8.5314|       Europe|
|     China|   1|1433783.686|1428213.6527542584|    1.0039|  9706961.0| 147.7068|         Asia|
|     India|   2|1366417.754|1353022.8280027725|    1.0099|  3287590.0|  415.629|         Asia|
|   Nigeria|   7| 200963.599|195909.14310781826|    1.0258|   923768.0| 217.5477|       Africa|
|Bangladesh|   8| 163046.161| 161415.8608058608|    1.0101|   147570.0|1104.8734|         Asia|
|        US|   3| 329064.917|327134.82155283826|    1.0059|  9372610.0|  35.1092|North America|
| Indonesia|   4| 270625.568|  267760.530325517|    1.0107|  1904569

In [25]:
%time
# pandas
merged_df = pd.merge(pop19_df, gdp19_df, on='country')
merged_df

CPU times: user 4 µs, sys: 1e+03 ns, total: 5 µs
Wall time: 22.4 µs


Unnamed: 0,Rank,country,pop2019(k),pop2018,GrowthRate,area,Density,continent
0,1,China,1433783.686,1428214.0,1.0039,9706961.0,147.7068,Asia
1,2,India,1366417.754,1353023.0,1.0099,3287590.0,415.629,Asia
2,3,US,329064.917,327134.8,1.0059,9372610.0,35.1092,North America
3,4,Indonesia,270625.568,267760.5,1.0107,1904569.0,142.0928,Asia
4,5,Pakistan,216565.318,212318.9,1.02,881912.0,245.5634,Asia
5,6,Brazil,211049.527,209540.8,1.0072,8515767.0,24.7834,South America
6,7,Nigeria,200963.599,195909.1,1.0258,923768.0,217.5477,Africa
7,8,Bangladesh,163046.161,161415.9,1.0101,147570.0,1104.8734,Asia
8,9,Russia,145872.256,145813.9,1.0004,17098242.0,8.5314,Europe


### Create a new dataframe

In [26]:
%time
# pandas
new_df = pd.DataFrame({'country': ['China', 'China', 'US', 'US'],
                       'year': [2018, 2019, 2018, 2019],
                       'population(k)':[1428213.653, 1433783.686, 327134.822, 329064.917]})
new_df

CPU times: user 8 µs, sys: 2 µs, total: 10 µs
Wall time: 30 µs


Unnamed: 0,country,year,population(k)
0,China,2018,1428213.653
1,China,2019,1433783.686
2,US,2018,327134.822
3,US,2019,329064.917


In [27]:
%time
# pyspark
new_sp = spark_session.createDataFrame(new_df)
new_sp.show()

CPU times: user 4 µs, sys: 1e+03 ns, total: 5 µs
Wall time: 23.1 µs
+-------+----+-------------+
|country|year|population(k)|
+-------+----+-------------+
|  China|2018|  1428213.653|
|  China|2019|  1433783.686|
|     US|2018|   327134.822|
|     US|2019|   329064.917|
+-------+----+-------------+



### Pivot table

In [None]:
%time
# pyspark


In [None]:
%time
# pandas
