In [1]:
! python --version

Python 3.8.12


In [2]:
!pip install pyspark

Collecting pyspark
  Using cached pyspark-3.2.1-py2.py3-none-any.whl
Collecting py4j==0.10.9.3
  Using cached py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


## Pyspark Introduction

In [2]:
import pyspark

In [5]:
!pip install pandas

Collecting pandas
  Downloading pandas-1.4.0-cp38-cp38-win_amd64.whl (10.6 MB)
Collecting numpy>=1.18.5
  Downloading numpy-1.22.2-cp38-cp38-win_amd64.whl (14.7 MB)
Collecting pytz>=2020.1
  Downloading pytz-2021.3-py2.py3-none-any.whl (503 kB)
Installing collected packages: pytz, numpy, pandas
Successfully installed numpy-1.22.2 pandas-1.4.0 pytz-2021.3


In [12]:
!pip install openpyxl

Collecting openpyxl
  Downloading openpyxl-3.0.9-py2.py3-none-any.whl (242 kB)
Collecting et-xmlfile
  Downloading et_xmlfile-1.1.0-py3-none-any.whl (4.7 kB)
Installing collected packages: et-xmlfile, openpyxl
Successfully installed et-xmlfile-1.1.0 openpyxl-3.0.9


In [3]:
import pandas as pd

In [17]:
pandas_df = pd.read_csv('test1.csv')
pandas_df

Unnamed: 0,name;age
0,Jasmin;27
1,Marcus;29
2,Vanessa;25
3,Christine;51


In [5]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [7]:
spark

In [30]:
df_pyspark = spark.read.csv('test1.csv')

In [40]:
df_pyspark.printSchema()

root
 |-- name;age: string (nullable = true)



In [32]:
df_pyspark.show()

+------------+
|         _c0|
+------------+
|    name;age|
|   Jasmin;27|
|   Marcus;29|
|  Vanessa;25|
|Christine;51|
+------------+



make first row column names, ; as seperator so we don't mix up columns

In [41]:
df_pyspark = spark.read.csv('test1.csv', header=True, sep=";")

In [44]:
df_pyspark.show()

+---------+---+
|     name|age|
+---------+---+
|   Jasmin| 27|
|   Marcus| 29|
|  Vanessa| 25|
|Christine| 51|
+---------+---+



In [42]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [43]:
df_pyspark.head(1)

[Row(name='Jasmin', age='27')]

In [39]:
df_pyspark.printSchema()

root
 |-- name;age: string (nullable = true)



## PySpark DataFrames Part 1
### reading dataset, checking the datatypes of the column (Schema), selecting columns and indexing, check describe option similar to pandas, adding columns, dropping columns

let's see different ways to read in data

In [50]:
spark

In [70]:
# read dataset
df1 = spark.read.csv('test1.csv', header=True, sep=";", inferSchema=True)

In [71]:
# Check schema
df1.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Alter: integer (nullable = true)
 |-- Skills: integer (nullable = true)



Why is it reading in my ints as strings? --> add the inferSchema option above in the read function!

In [72]:
df1 = spark.read.csv('test1.csv', header=True, sep=";", inferSchema=True)
df1.show()

+-------+-----+------+
|   Name|Alter|Skills|
+-------+-----+------+
| Marcus|   29|   100|
|Vanessa|   25|    90|
| Jasmin|   27|    80|
+-------+-----+------+



In [73]:
type(df1)

pyspark.sql.dataframe.DataFrame

In [75]:
#select certain columns
df1.select(['Name', 'Skills']).show()

+-------+------+
|   Name|Skills|
+-------+------+
| Marcus|   100|
|Vanessa|    90|
| Jasmin|    80|
+-------+------+



In [76]:
#show datatypes of all columns
df1.dtypes

[('Name', 'string'), ('Alter', 'int'), ('Skills', 'int')]

In [78]:
#check the describe options - gives out statistics about my dataframe
df1.describe().show()

+-------+-------+-----+------+
|summary|   Name|Alter|Skills|
+-------+-------+-----+------+
|  count|      3|    3|     3|
|   mean|   null| 27.0|  90.0|
| stddev|   null|  2.0|  10.0|
|    min| Jasmin|   25|    80|
|    max|Vanessa|   29|   100|
+-------+-------+-----+------+



In [88]:
#adding columns --> add new column and add 2 two values of old column Skills
df1=df1.withColumn('Skills After Two Years', df1['Skills']+2)
df1.show()

+-------+-----+------+----------------------+
|   Name|Alter|Skills|Skills After Two Years|
+-------+-----+------+----------------------+
| Marcus|   29|   100|                   102|
|Vanessa|   25|    90|                    92|
| Jasmin|   27|    80|                    82|
+-------+-----+------+----------------------+



In [89]:
#drop columns
df1 = df1.drop('Alter')
df1.show()

+-------+------+----------------------+
|   Name|Skills|Skills After Two Years|
+-------+------+----------------------+
| Marcus|   100|                   102|
|Vanessa|    90|                    92|
| Jasmin|    80|                    82|
+-------+------+----------------------+



In [92]:
#rename the columns
df1 = df1.withColumnRenamed('Name', 'Vorname')
df1.show()

+-------+------+----------------------+
|Vorname|Skills|Skills After Two Years|
+-------+------+----------------------+
| Marcus|   100|                   102|
|Vanessa|    90|                    92|
| Jasmin|    80|                    82|
+-------+------+----------------------+



## PySpark Handling missing values
### dropping columns, dropping rows, various parameter in dropping functionalities, handling missing values by Mean, Median and Mode

In [96]:
df2 = spark.read.csv('test2.csv', header=True, inferSchema=True, sep=';')

In [97]:
df2.show()   #blanks are filled in with null

+------+----+--------+--------+
|  Name| Age|YearsJob|YearsUni|
+------+----+--------+--------+
|  Anna|  20|       0|       2|
|  Yana|  30|       5|       4|
|Marcus|  29|       3|       7|
|  Paul|  35|      10|       5|
|Emilia|null|    null|       3|
|  null|  40|      15|       3|
|  null|  18|    null|    null|
+------+----+--------+--------+



In [99]:
#drop column
df2.drop('Name').show()

+----+--------+--------+
| Age|YearsJob|YearsUni|
+----+--------+--------+
|  20|       0|       2|
|  30|       5|       4|
|  29|       3|       7|
|  35|      10|       5|
|null|    null|       3|
|  40|      15|       3|
|  18|    null|    null|
+----+--------+--------+



In [100]:
# drop ROWS based on null values - without arg in drop() - drops all rows where there is a null value
df2.na.drop().show()

+------+---+--------+--------+
|  Name|Age|YearsJob|YearsUni|
+------+---+--------+--------+
|  Anna| 20|       0|       2|
|  Yana| 30|       5|       4|
|Marcus| 29|       3|       7|
|  Paul| 35|      10|       5|
+------+---+--------+--------+



In [102]:
# drop(how="all")
df2.na.drop(how="all").show() # only drops where we have ALL NULLS in a row - will not drop any row here

+------+----+--------+--------+
|  Name| Age|YearsJob|YearsUni|
+------+----+--------+--------+
|  Anna|  20|       0|       2|
|  Yana|  30|       5|       4|
|Marcus|  29|       3|       7|
|  Paul|  35|      10|       5|
|Emilia|null|    null|       3|
|  null|  40|      15|       3|
|  null|  18|    null|    null|
+------+----+--------+--------+



In [103]:
# drop(how = "any")
df2.na.drop(how="any").show()  # drops where we have any null value - so same as not specifying drop()

+------+---+--------+--------+
|  Name|Age|YearsJob|YearsUni|
+------+---+--------+--------+
|  Anna| 20|       0|       2|
|  Yana| 30|       5|       4|
|Marcus| 29|       3|       7|
|  Paul| 35|      10|       5|
+------+---+--------+--------+



In [104]:
# threshold - at least x non null values - only drops last row with 1 non null value
df2.na.drop(how="any", thresh=2).show()

+------+----+--------+--------+
|  Name| Age|YearsJob|YearsUni|
+------+----+--------+--------+
|  Anna|  20|       0|       2|
|  Yana|  30|       5|       4|
|Marcus|  29|       3|       7|
|  Paul|  35|      10|       5|
|Emilia|null|    null|       3|
|  null|  40|      15|       3|
+------+----+--------+--------+



In [105]:
# drop(subset)
df2.na.drop(how="any", subset=['YearsJob']).show()   #only drop null values from column YearsJob

+------+---+--------+--------+
|  Name|Age|YearsJob|YearsUni|
+------+---+--------+--------+
|  Anna| 20|       0|       2|
|  Yana| 30|       5|       4|
|Marcus| 29|       3|       7|
|  Paul| 35|      10|       5|
|  null| 40|      15|       3|
+------+---+--------+--------+



In [114]:
# fill missing value
df2.na.fill(value='xxxxx', subset=['Name', 'Age']).show()

+------+----+--------+--------+
|  Name| Age|YearsJob|YearsUni|
+------+----+--------+--------+
|  Anna|  20|       0|       2|
|  Yana|  30|       5|       4|
|Marcus|  29|       3|       7|
|  Paul|  35|      10|       5|
|Emilia|null|    null|       3|
| xxxxx|  40|      15|       3|
| xxxxx|  18|    null|    null|
+------+----+--------+--------+



In [116]:
# fill missing value with the MEAN of the specific column with an IMPUTER function
df2.show()

+------+----+--------+--------+
|  Name| Age|YearsJob|YearsUni|
+------+----+--------+--------+
|  Anna|  20|       0|       2|
|  Yana|  30|       5|       4|
|Marcus|  29|       3|       7|
|  Paul|  35|      10|       5|
|Emilia|null|    null|       3|
|  null|  40|      15|       3|
|  null|  18|    null|    null|
+------+----+--------+--------+



In [122]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['Age', 'YearsJob', 'YearsUni'],
    outputCols=["{}_imputed".format(c) for c in ['Age', 'YearsJob', 'YearsUni']] #this creates "new" cols with old name +_imputed
    ).setStrategy("median")     #strategy can be mean, median, mode, etc

In [123]:
# add imputation columns to df - now we could "drop" the old cols
imputer.fit(df2).transform(df2).show()

+------+----+--------+--------+-----------+----------------+----------------+
|  Name| Age|YearsJob|YearsUni|Age_imputed|YearsJob_imputed|YearsUni_imputed|
+------+----+--------+--------+-----------+----------------+----------------+
|  Anna|  20|       0|       2|         20|               0|               2|
|  Yana|  30|       5|       4|         30|               5|               4|
|Marcus|  29|       3|       7|         29|               3|               7|
|  Paul|  35|      10|       5|         35|              10|               5|
|Emilia|null|    null|       3|         29|               5|               3|
|  null|  40|      15|       3|         40|              15|               3|
|  null|  18|    null|    null|         18|               5|               3|
+------+----+--------+--------+-----------+----------------+----------------+



## PySpark Dataframe Part 2

## PySpark Groupby And Aggregate Functions

## PySpark Mlib - Installation and Implementation

## Introduction to Databricks

## (Implementing Linear Regression using Databricks in Single Clusters