# PySpark Tutorial

In [1]:
# Import modules

import os
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SparkSession
from sklearn.datasets import make_classification

In [2]:
# Create spark session

os.environ['JAVA_HOME'] = '/usr/lib/jvm/openlogic-openjdk-8-hotspot-amd64'
spark = SparkSession(SparkContext()).builder.getOrCreate()
spark

In [42]:
# Create samples

X, y = make_classification(
    n_samples=10000,
    n_features=4,
    n_classes=2,
    random_state=1,
    shuffle=True
)

In [43]:
# Create dataframe

table = pd.DataFrame(X, columns=['feature_1', 'feature_2', 'feature_3', 'feature_4'])
table['target'] = y
table['feature_3'] = round(table['feature_3'], 1)
df = spark.createDataFrame(table)

In [44]:
# Print schema

df.printSchema()

root
 |-- feature_1: double (nullable = true)
 |-- feature_2: double (nullable = true)
 |-- feature_3: double (nullable = true)
 |-- feature_4: double (nullable = true)
 |-- target: long (nullable = true)



In [45]:
# Show first rows

df.show(truncate=False)

+--------------------+--------------------+---------+--------------------+------+
|feature_1           |feature_2           |feature_3|feature_4           |target|
+--------------------+--------------------+---------+--------------------+------+
|-1.8873649371603203 |-1.1455691898002351 |0.8      |-2.008855708086743  |0     |
|-0.18266809216582025|-0.12226678277057923|0.1      |-0.2054662226628986 |0     |
|-0.7315948349672106 |0.6559036936823883  |0.2      |0.28714060384238044 |0     |
|-0.7749652163170958 |0.7440265567629247  |0.2      |0.35187466485355684 |0     |
|-1.3394227045324436 |-1.0424630852864827 |0.6      |-1.6479989577746343 |0     |
|-0.18017499772535683|-1.6244897462240875 |0.2      |-1.6598910893214862 |1     |
|0.82509156468382    |-0.5728119479422702 |-0.2     |-0.16210051159032354|0     |
|1.1781029789531323  |0.17113092183663403 |-0.5     |0.7268761264280708  |1     |
|-1.2057458102820222 |1.4411211027600361  |0.3      |0.8221866229867812  |0     |
|-0.230668107813

In [46]:
# Shape of the dataframe

print(f'shape: ({df.count()}, {len(df.columns)})')

shape: (10000, 5)


In [47]:
# Statistics summary

df.describe().show()

+-------+--------------------+--------------------+--------------------+--------------------+------------------+
|summary|           feature_1|           feature_2|           feature_3|           feature_4|            target|
+-------+--------------------+--------------------+--------------------+--------------------+------------------+
|  count|               10000|               10000|               10000|               10000|             10000|
|   mean|-3.46424099960548...|3.155528014314736E-4|2.400000000000085...|1.407822436023929E-4|            0.5001|
| stddev|  1.3539394793242931|   1.236669413066441|  0.5370780089323783|   1.388316589602154|0.5000249918746562|
|    min|  -4.066746338839522| -2.9128501224788117|                -1.8|   -3.96913949312817|                 0|
|    max|   4.699669037396011|   4.861123080329538|                 1.7|   5.097732594016294|                 1|
+-------+--------------------+--------------------+--------------------+--------------------+---

In [48]:
# Describe specific column

df.describe('feature_2').show()

+-------+--------------------+
|summary|           feature_2|
+-------+--------------------+
|  count|               10000|
|   mean|3.155528014314736E-4|
| stddev|   1.236669413066441|
|    min| -2.9128501224788117|
|    max|   4.861123080329538|
+-------+--------------------+



In [49]:
# Select columns

df.select('feature_1', 'target').show()

+--------------------+------+
|           feature_1|target|
+--------------------+------+
| -1.8873649371603203|     0|
|-0.18266809216582025|     0|
| -0.7315948349672106|     0|
| -0.7749652163170958|     0|
| -1.3394227045324436|     0|
|-0.18017499772535683|     1|
|    0.82509156468382|     0|
|  1.1781029789531323|     1|
| -1.2057458102820222|     0|
|-0.23066810781360036|     0|
| -1.0072234845517323|     0|
| 0.47538225794367417|     1|
| -0.2131515767790706|     1|
| -1.5496574364883247|     0|
| -1.1086769809797885|     0|
|  1.5405802242318427|     1|
|  0.2892416515109355|     0|
| 0.29676943127701305|     1|
|  1.3199355367253436|     1|
|  -2.125342086258849|     0|
+--------------------+------+
only showing top 20 rows



In [50]:
# Find number unique values

df.select('target').distinct().count()

2

In [52]:
# Pair wise frequency

df.crosstab('feature_3', 'target').show()

+----------------+---+---+
|feature_3_target|  0|  1|
+----------------+---+---+
|             0.0|252|292|
|             1.5|  8|  0|
|            -1.5|  0| 16|
|             0.3|918|133|
|            -0.3| 21|443|
|             0.6|315| 39|
|            -0.6|  3|392|
|            -1.3|  0| 49|
|             1.3| 19|  1|
|            -1.2|  0| 79|
|             1.0| 95|  2|
|             1.2| 54|  1|
|            -0.2| 34|434|
|             0.2|758|189|
|            -0.4| 19|453|
|             0.4|721| 94|
|            -0.8|  2|305|
|             0.8|180| 14|
|            -1.6|  0|  7|
|             1.6|  2|  0|
+----------------+---+---+
only showing top 20 rows



In [53]:
# Drop duplicates

df.select('feature_3', 'target').dropDuplicates().count()

60

In [55]:
# Drop NaN

df.dropna().count()

10000

In [56]:
# Fill NaN

df.fillna(0).show()

+--------------------+--------------------+---------+--------------------+------+
|           feature_1|           feature_2|feature_3|           feature_4|target|
+--------------------+--------------------+---------+--------------------+------+
| -1.8873649371603203| -1.1455691898002351|      0.8|  -2.008855708086743|     0|
|-0.18266809216582025|-0.12226678277057923|      0.1| -0.2054662226628986|     0|
| -0.7315948349672106|  0.6559036936823883|      0.2| 0.28714060384238044|     0|
| -0.7749652163170958|  0.7440265567629247|      0.2| 0.35187466485355684|     0|
| -1.3394227045324436| -1.0424630852864827|      0.6| -1.6479989577746343|     0|
|-0.18017499772535683| -1.6244897462240875|      0.2| -1.6598910893214862|     1|
|    0.82509156468382| -0.5728119479422702|     -0.2|-0.16210051159032354|     0|
|  1.1781029789531323| 0.17113092183663403|     -0.5|  0.7268761264280708|     1|
| -1.2057458102820222|  1.4411211027600361|      0.3|  0.8221866229867812|     0|
|-0.230668107813

In [58]:
# Filter

df.filter(df.target == 1).show()

+--------------------+--------------------+---------+--------------------+------+
|           feature_1|           feature_2|feature_3|           feature_4|target|
+--------------------+--------------------+---------+--------------------+------+
|-0.18017499772535683| -1.6244897462240875|      0.2| -1.6598910893214862|     1|
|  1.1781029789531323| 0.17113092183663403|     -0.5|  0.7268761264280708|     1|
| 0.47538225794367417|  1.7486569358975648|     -0.4|  1.9207939550154742|     1|
| -0.2131515767790706|  -1.748906639130869|      0.3| -1.7961522233276521|     1|
|  1.5405802242318427| -0.7978912930416052|     -0.5|-0.03945467385592581|     1|
| 0.29676943127701305| -1.3502191165401722|      0.0| -1.1669923954311183|     1|
|  1.3199355367253436|     1.4688067175103|     -0.7|   2.051833559881125|     1|
| 0.11153035574937986|  -1.598210205756609|      0.1| -1.4955063605051886|     1|
|  2.5329084737138445|  3.2571767106701675|     -1.3|   4.362375584875162|     1|
|  0.86570810752

In [59]:
# Group by count

df.groupby('feature_3').count().show()

+---------+-----+
|feature_3|count|
+---------+-----+
|     -1.2|   79|
|      0.0|  544|
|     -1.4|   31|
|     -1.3|   49|
|     -1.7|    3|
|      0.2|  947|
|     -0.1|  488|
|     -1.0|  173|
|      1.4|   13|
|      1.7|    1|
|      0.7|  263|
|     -0.9|  262|
|      0.1|  722|
|     -1.8|    3|
|     -1.1|  112|
|      1.0|   97|
|      0.6|  354|
|      0.8|  194|
|     -0.7|  343|
|     -0.3|  464|
+---------+-----+
only showing top 20 rows



In [62]:
# Group by mean

df.groupby('feature_3').agg({
    'feature_1': 'mean',
    'feature_2': 'max'
}).show()

+---------+-------------------+-------------------+
|feature_3|     max(feature_2)|     avg(feature_1)|
+---------+-------------------+-------------------+
|     -1.2| 3.4286138446305863| 2.8567896648621165|
|      0.0| 3.2580554555397163|0.13651659417941878|
|     -1.4|  3.367473777386891|  3.333235365936271|
|     -1.3| 3.9594718756003084| 3.0398711984217863|
|     -1.7| 2.2967488854180944|  3.973725619737477|
|      0.2| 3.8510846722195553|-0.5710866586673432|
|     -0.1|  4.148950280260009|0.38281061465031235|
|     -1.0| 3.7485182564556303|  2.383973432507364|
|      1.4| -1.431868823352709| -3.240292827332823|
|      1.7|-1.5771327131120745| -4.066746338839522|
|      0.7| 3.2243694266618226|  -1.52981943639022|
|     -0.9|   4.72440677839553| 2.1959393316218647|
|      0.1| 2.9513813293000704|-0.2207497454783894|
|     -1.8|  2.367472364142598|   4.32827402097519|
|     -1.1|  4.315857310503869|  2.580174417691204|
|      1.0|-1.1631112106851182|  -2.26922644950199|
|      0.6| 

In [70]:
# Sub sample

df.sample(0.1, seed=1).count()

1049

In [88]:
# Limit samples

df.limit(1000).count()

1000

In [78]:
# Order by

df.orderBy(df.feature_1.asc()).show()

+-------------------+-------------------+---------+-------------------+------+
|          feature_1|          feature_2|feature_3|          feature_4|target|
+-------------------+-------------------+---------+-------------------+------+
| -4.066746338839522|-1.5771327131120745|      1.7| -3.464928581694748|     0|
|-3.8324145990133376|-1.7325482078312657|      1.6|-3.5039246556160557|     0|
|-3.8042407959227833| -1.689311798557477|      1.6|-3.4486124015978006|     0|
|-3.5915592801087297|-1.5212197690514027|      1.5| -3.184449320084664|     0|
| -3.561891433381981|-1.4968335596402964|      1.5|-3.1466908668238385|     0|
|-3.5594958578273355|-1.7389131584234274|      1.5| -3.380118389310072|     0|
|-3.5213738626890114|-1.5244911239996781|      1.5| -3.154194350311359|     0|
|-3.4826035634650494|  -1.58483263528437|      1.5|-3.1941997806568097|     0|
|-3.4768984149158335| -1.593603026986611|      1.5|-3.1999810486400775|     0|
|-3.4563744827381537|-1.4697994084759771|      1.5|-

In [83]:
# Add new columns
df.withColumn('feature_4', df.feature_3*10).show()

+--------------------+--------------------+---------+---------+------+
|           feature_1|           feature_2|feature_3|feature_4|target|
+--------------------+--------------------+---------+---------+------+
| -1.8873649371603203| -1.1455691898002351|      0.8|      8.0|     0|
|-0.18266809216582025|-0.12226678277057923|      0.1|      1.0|     0|
| -0.7315948349672106|  0.6559036936823883|      0.2|      2.0|     0|
| -0.7749652163170958|  0.7440265567629247|      0.2|      2.0|     0|
| -1.3394227045324436| -1.0424630852864827|      0.6|      6.0|     0|
|-0.18017499772535683| -1.6244897462240875|      0.2|      2.0|     1|
|    0.82509156468382| -0.5728119479422702|     -0.2|     -2.0|     0|
|  1.1781029789531323| 0.17113092183663403|     -0.5|     -5.0|     1|
| -1.2057458102820222|  1.4411211027600361|      0.3|      3.0|     0|
|-0.23066810781360036|-0.18246194798172555|      0.1|      1.0|     0|
| -1.0072234845517323|  0.8545722498639476|      0.3|      3.0|     0|
| 0.47

In [85]:
# Drop column

df.drop('feature_1').show()

+--------------------+---------+--------------------+------+
|           feature_2|feature_3|           feature_4|target|
+--------------------+---------+--------------------+------+
| -1.1455691898002351|      0.8|  -2.008855708086743|     0|
|-0.12226678277057923|      0.1| -0.2054662226628986|     0|
|  0.6559036936823883|      0.2| 0.28714060384238044|     0|
|  0.7440265567629247|      0.2| 0.35187466485355684|     0|
| -1.0424630852864827|      0.6| -1.6479989577746343|     0|
| -1.6244897462240875|      0.2| -1.6598910893214862|     1|
| -0.5728119479422702|     -0.2|-0.16210051159032354|     0|
| 0.17113092183663403|     -0.5|  0.7268761264280708|     1|
|  1.4411211027600361|      0.3|  0.8221866229867812|     0|
|-0.18246194798172555|      0.1|-0.28665299425314167|     0|
|  0.8545722498639476|      0.3|   0.348380613815614|     0|
|  1.7486569358975648|     -0.4|  1.9207939550154742|     1|
|  -1.748906639130869|      0.3| -1.7961522233276521|     1|
|   1.775625586462539|  

In [99]:
# Rename column

df.select(df.target.alias('y_true')).show()

+------+
|y_true|
+------+
|     0|
|     0|
|     0|
|     0|
|     0|
|     1|
|     0|
|     1|
|     0|
|     0|
|     0|
|     1|
|     1|
|     0|
|     0|
|     1|
|     0|
|     1|
|     1|
|     0|
+------+
only showing top 20 rows



In [96]:
# Another way to rename column

df.withColumnRenamed('target', 'y_true').show()

+--------------------+--------------------+---------+--------------------+------+
|           feature_1|           feature_2|feature_3|           feature_4|y_true|
+--------------------+--------------------+---------+--------------------+------+
| -1.8873649371603203| -1.1455691898002351|      0.8|  -2.008855708086743|     0|
|-0.18266809216582025|-0.12226678277057923|      0.1| -0.2054662226628986|     0|
| -0.7315948349672106|  0.6559036936823883|      0.2| 0.28714060384238044|     0|
| -0.7749652163170958|  0.7440265567629247|      0.2| 0.35187466485355684|     0|
| -1.3394227045324436| -1.0424630852864827|      0.6| -1.6479989577746343|     0|
|-0.18017499772535683| -1.6244897462240875|      0.2| -1.6598910893214862|     1|
|    0.82509156468382| -0.5728119479422702|     -0.2|-0.16210051159032354|     0|
|  1.1781029789531323| 0.17113092183663403|     -0.5|  0.7268761264280708|     1|
| -1.2057458102820222|  1.4411211027600361|      0.3|  0.8221866229867812|     0|
|-0.230668107813

In [110]:
# Spark DF to pandas DF

df.toPandas()

Unnamed: 0,feature_1,feature_2,feature_3,feature_4,target
0,-1.887365,-1.145569,0.8,-2.008856,0
1,-0.182668,-0.122267,0.1,-0.205466,0
2,-0.731595,0.655904,0.2,0.287141,0
3,-0.774965,0.744027,0.2,0.351875,0
4,-1.339423,-1.042463,0.6,-1.647999,0
...,...,...,...,...,...
9995,-1.278285,1.456590,0.3,0.802630,0
9996,1.093958,-0.247781,-0.4,0.280889,1
9997,-0.261232,-0.146434,0.1,-0.266299,0
9998,-1.288055,-0.955119,0.6,-1.538902,0
