# **Apache Spark Code**

# Setting up environment

## Installing PySpark

In [28]:
!pip install pyspark



In [48]:
# Checking PySpark version:

!pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.0.3.3.7180.0-274
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 1.8.0_382
Branch HEAD
Compiled by user  on 2022-08-30T12:57:40Z
Revision 73b4169950a4b83435306674db9e31e28529e8b5
Url git@github.infra.cloudera.com:CDH/spark3.git
Type --help for more information.


## Installing Java (for JVM)

In [None]:
!conda install -c cyclus java-jdk

Collecting package metadata (repodata.json): done
Solving environment: / 
The environment is inconsistent, please check the package plan carefully
The following packages are causing the inconsistency:

  - defaults/noarch::flask-jwt-extended==3.24.1=py_0
  - defaults/linux-64::anaconda==custom=py37_1
  - defaults/noarch::flask-babel==0.12.2=py_1
  - defaults/linux-64::flask-cors==3.0.7=py37_0
  - defaults/noarch::flask-admin==1.5.4=py_0
  - defaults/noarch::flask-sqlalchemy==2.4.4=py_0
  - defaults/linux-64::flask-login==0.4.1=py37_0
  - defaults/linux-64::flask-openid==1.2.5=py37_1003
  - defaults/linux-64::airflow==1.10.10=py37_0
  - defaults/noarch::flask-appbuilder==2.2.1=py_1
  - defaults/noarch::flask-swagger==0.2.13=py_2
  - defaults/linux-64::blaze==0.11.3=py37_0
  - defaults/noarch::flask-caching==1.9.0=py_0
  - defaults/noarch::flask==1.1.1=py_0
  - defaults/noarch::flask-wtf==0.14.3=py_0
  - defaults/linux-64::_anaconda_depends==2019.10=py37_0
done

## Package Plan ##

  env

## Kerberos login

In [8]:
# Closing all active Kerberos connections (tickets):
#!kdestroy

# Starting a Kerberos connection:
!kinit -kt login/Jakub.Cajzl.keytab Jakub.Cajzl

Exception: too many parameters
java.lang.IllegalArgumentException: too many parameters
	at sun.security.krb5.internal.tools.KinitOptions.<init>(KinitOptions.java:153)
	at sun.security.krb5.internal.tools.Kinit.<init>(Kinit.java:147)
	at sun.security.krb5.internal.tools.Kinit.main(Kinit.java:113)


In [9]:
# Displaying active tickets:
!klist

Credentials cache C:\Users\Jakub.Cajzl\krb5cc_Jakub.Cajzl not found.


## Libraries

In [1]:
# Importing libraries:
import sys, os
import pyspark
from pyspark import SparkConf, SparkContext, StorageLevel
from pyspark.sql import SparkSession, DataFrame, SQLContext
# from pyspark.sql import types as T, functions as F

from pyspark.sql.functions import col
from pyspark.sql.types import *

from datetime import datetime, date
import pandas as pd

# It is better to load pySpark version of Pandas as 'ps' instead of 'pd'
# because pySpark has differences in their version of Pandas 
# (mainly because pySpark is masivelly parallelized and has many 
# parallelization parameters).
# from pyspark import pandas as ps

## Spark session

In [2]:
spark = (SparkSession.builder
            .master("local")
            .appName("app_01")
            .getOrCreate()
            )

In [2]:
# Other option:
#sc = SparkContext("local")
#spark = SparkSession(sc)

# Main Code - Tasks

In the following sub-chapters and lines of code are tasks aimed to get familiar with Apache Spark DataFrame API and SQL API.

## 1) Create a DataFrame using createDataFrame()

In [3]:
import random
import string

# Defining schema:
my_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("surname", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("location", StringType(), True),
    StructField("height", FloatType(), True)
    ])

# 50 most common US male and female names:
male_names = ["James", "John", "Robert", "Michael", "William", "David", "Richard", "Joseph", "Thomas", "Charles", 
              "Christopher", "Daniel", "Matthew", "Anthony", "Mark", "Donald", "Steven", "Paul", "Andrew", "Joshua", 
              "Kenneth", "Kevin", "Brian", "George", "Edward", "Ronald", "Timothy", "Jason", "Jeffrey", "Ryan", 
              "Jacob", "Gary", "Nicholas", "Eric", "Jonathan", "Stephen", "Larry", "Justin", "Scott", "Brandon", 
              "Benjamin", "Samuel", "Gregory", "Frank", "Alexander", "Raymond", "Patrick", "Jack", "Dennis", "Jerry"]
              
female_names = ["Mary", "Patricia", "Jennifer", "Linda", "Elizabeth", "Barbara", "Susan", "Jessica", "Sarah", "Karen", 
                "Nancy", "Lisa", "Margaret", "Betty", "Sandra", "Ashley", "Kimberly", "Emily", "Donna", "Michelle", 
                "Dorothy", "Carol", "Amanda", "Melissa", "Deborah", "Stephanie", "Rebecca", "Sharon", "Laura", "Cynthia", 
                "Kathleen", "Amy", "Shirley", "Angela", "Helen", "Anna", "Brenda", "Pamela", "Nicole", "Samantha", 
                "Katherine", "Emma", "Ruth", "Christine", "Catherine", "Debra", "Rachel", "Carolyn", "Janet", "Virginia"]

# 100 most common US surnames:
surnames = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis", "Rodriguez", "Martinez",
            "Hernandez", "Lopez", "Gonzalez", "Wilson", "Anderson", "Thomas", "Taylor", "Moore", "Jackson", "Martin",
            "Lee", "Perez", "Thompson", "White", "Harris", "Sanchez", "Clark", "Ramirez", "Lewis", "Robinson",
            "Walker", "Young", "Allen", "King", "Wright", "Scott", "Torres", "Nguyen", "Hill", "Flores",
            "Green", "Adams", "Nelson", "Baker", "Hall", "Rivera", "Campbell", "Mitchell", "Carter", "Roberts",
            "Gomez", "Phillips", "Evans", "Turner", "Diaz", "Parker", "Cruz", "Edwards", "Collins", "Reyes",
            "Stewart", "Morris", "Morales", "Murphy", "Cook", "Rogers", "Gutierrez", "Ortiz", "Morgan", "Cooper",
            "Peterson", "Bailey", "Reed", "Kelly", "Howard", "Ramos", "Kim", "Cox", "Ward", "Richardson",
            "Watson", "Brooks", "Chavez", "Wood", "James", "Bennett", "Gray", "Mendoza", "Ruiz", "Hughes",
            "Price", "Alvarez", "Castillo", "Sanders", "Patel", "Myers", "Long", "Ross", "Foster", "Jimenez"]

data = []

for i in range(1, 101):
    
    id = i
    
    sex = random.choice(['Male','Female'])
    
    # Creating random 50 male and 50 female names:
    if sex == 'Male':
        name = random.choice(male_names)
    elif sex == 'Female':
        name = random.choice(female_names)
        
    surname = random.choice(surnames)
        
    age = random.randint(18, 70)
        
    location = random.choice([
        'Alabama', 'Alaska', 'Arizona', 'Arkansas', 'California', 'Colorado', 'Connecticut',
        'Delaware', 'Florida', 'Georgia', 'Hawaii', 'Idaho', 'Illinois', 'Indiana', 'Iowa',
        'Kansas', 'Kentucky', 'Louisiana', 'Maine', 'Maryland', 'Massachusetts', 'Michigan',
        'Minnesota', 'Mississippi', 'Missouri', 'Montana', 'Nebraska', 'Nevada', 'New Hampshire',
        'New Jersey', 'New Mexico', 'New York', 'North Carolina', 'North Dakota', 'Ohio', 'Oklahoma',
        'Oregon', 'Pennsylvania', 'Rhode Island', 'South Carolina', 'South Dakota', 'Tennessee',
        'Texas', 'Utah', 'Vermont', 'Virginia', 'Washington', 'West Virginia', 'Wisconsin', 'Wyoming'
        ])
        
    height = round(random.uniform(1.5, 2.0), 2)
    
    data.append((id, name, surname, sex, age, location, height))
    
# Creating DataFrame:
df = spark.createDataFrame(data, schema=my_schema)

## 2) Display schema of the DataFrame

In [4]:
print(df)

DataFrame[id: int, name: string, surname: string, sex: string, age: int, location: string, height: float]


In [5]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- height: float (nullable = true)



In [6]:
df.show(10)

Py4JJavaError: An error occurred while calling o38.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (Cloud-CQKD9G3.adastracorp.net executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3537)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 32 more


## 3) Display the DataFrame and row count

In [7]:
df.show()

+---+--------+----------+------+---+------------+------+
| id|    name|   surname|   sex|age|    location|height|
+---+--------+----------+------+---+------------+------+
|  1|     Amy| Rodriguez|Female| 54|North Dakota|  1.65|
|  2| Deborah|  Campbell|Female| 31|     Indiana|  1.51|
|  3|   Debra|     White|Female| 45|       Maine|   1.5|
|  4|Margaret|     Smith|Female| 69|    Missouri|  1.92|
|  5|   Helen|    Morgan|Female| 22|     Vermont|  1.78|
|  6| Charles|    Morris|  Male| 28|     Georgia|  1.69|
|  7|   Emily|    Miller|Female| 28|South Dakota|  1.69|
|  8|    Mark|    Parker|  Male| 61|Rhode Island|  1.84|
|  9| Gregory|    Morris|  Male| 68|   Louisiana|   1.6|
| 10|  Brenda|      Cook|Female| 51|    Illinois|  1.57|
| 11|  Joseph|    Miller|  Male| 39|    Arkansas|  1.58|
| 12|    Emma|    Wilson|Female| 41|    Michigan|  1.58|
| 13|  Ashley|    Harris|Female| 63| Connecticut|  1.62|
| 14|Virginia|     Ramos|Female| 30|  New Mexico|   1.5|
| 15| Rebecca|      Hall|Female

In [58]:
# Showing only 3 top rows and truncating output to 3 characters:

df.show(n=3, truncate=3)

+---+----+-------+---+---+--------+------+
| id|name|surname|sex|age|location|height|
+---+----+-------+---+---+--------+------+
|  1| Jac|    Alv|Mal| 25|     Wis|   1.9|
|  2| Ben|    Tur|Mal| 52|     Sou|   1.6|
|  3| Dav|    Pet|Mal| 46|     Con|   1.7|
+---+----+-------+---+---+--------+------+
only showing top 3 rows



In [59]:
# Displaying the row count:
print(f"Row count: {df.count()}")

Row count: 100


In [60]:
# Counting males and females in the dataset:
males_count = df.filter(col('sex')=='Male').count()
females_count = df.filter(col('sex')=='Female').count()

print(f"Number of males: {males_count}\nNumber of females: {females_count}")

Number of males: 53
Number of females: 47


## 4) Save the created DataFrame in parquet and csv file

In [17]:
(df.write
    .format("parquet")
    .option("compression", "snappy")
    .mode("overwrite")
    .save("created_dataset_parquet")
    )

In [18]:
(df.write
    .format("csv")
    #.partitionBy("id")
    .option("header", True)
    .mode("overwrite")
    .save("created_dataset_csv")
    )

In [1]:
# Listing a main directory (should contain _csv and _parquet folders):
!ls

created_dataset_csv	 final_dataset.csv   PySpark_Kaggle_Dataset.ipynb
created_dataset_parquet  Jakub.Cajzl.keytab  Spark_Excercises.ipynb


## 5) Change a number of partitions and re-save the DataFrame

In [10]:
# Displaying a number of partitions:
print(f"Number of partitions for 'df' DataFrame: {df.rdd.getNumPartitions()}")

Number of partitions for 'df' DataFrame: 2


In [11]:
# Repartitioning to 1 partition:
df = df.repartition(1)

In [12]:
# Displaying the NEW number of partitions:
print(f"Number of partitions for 'df' DataFrame: {df.rdd.getNumPartitions()}")

Number of partitions for 'df' DataFrame: 1


In [16]:
# Re-saving a dataframe with only 1 partition:
(df.write
    .format("csv")
    .option("header", True)
    .mode("overwrite")
    .save("created_dataset_csv")
    )

In [2]:
# Listing a CSV directory (should contain 1 CSV file):
!ls created_dataset_csv

part-00000-65697176-6d7a-4864-adeb-03fd9ba0f7ad-c000.csv  _SUCCESS


## 6) Rename saved partitions

In [21]:
# Function for listing a directory:
def list_directory(path: str):
    '''
    Function for listing files in the given directory omitting hidden files and "_SUCCESS" file. 
    '''
    for item in os.listdir(path):
        if not (item.startswith(".") or item == "_SUCCESS"):
            return(item)

In [3]:
# Checking the saved datasets:
!ls

created_dataset_csv	 final_dataset.csv   PySpark_Kaggle_Dataset.ipynb
created_dataset_parquet  Jakub.Cajzl.keytab  Spark_Excercises.ipynb


In [4]:
# Checking the number of partitions:
!ls created_dataset_csv

part-00000-65697176-6d7a-4864-adeb-03fd9ba0f7ad-c000.csv  _SUCCESS


In [22]:
def rename_csv_partitions(directory: str, partition_name: str):
    '''
    Function for renaming saved default CSV partitions created by Spark.
    
    Parameters:
    - directory (string): a directory where CSV files are located
    - partition_name (string): new name for CSV partition(s) without a .csv extension
    '''
    
    new_name = f"{directory}/{partition_name}.csv"

    # Renaming all the CSV files in the directory:
    for item in os.listdir(directory):
        if item.endswith(".csv"):
            old_name = f"{directory}/{item}"
            os.rename(old_name, new_name)


# Renaming the partitions:
directory = "created_dataset_csv"
partition_name = "dataset"

#rename_csv_partitions(directory, partition_name)

# Listing the main directory with renamed .csv file(s):
print(f"Contents of the directory '{directory}':\n{list_directory(directory)}")

Contents of the directory 'created_dataset_csv':
dataset.csv


## 7) Load a CSV file without and with header

In [146]:
# Loading a CSV without a header:
df2 = spark.read.csv("created_dataset_csv")

df2.show()

+---+--------+---------+------+---+-------------+------+
|_c0|     _c1|      _c2|   _c3|_c4|          _c5|   _c6|
+---+--------+---------+------+---+-------------+------+
| id|    name|  surname|   sex|age|     location|height|
|  1|   Debra|   Walker|Female| 22|      Alabama|  1.67|
|  2|   Helen|  Jimenez|Female| 19| Pennsylvania|   1.6|
|  3|  Andrew|   Miller|  Male| 66|     Nebraska|  1.79|
|  4| Kenneth|   Thomas|  Male| 35|     New York|  1.68|
|  5|    John|   Miller|  Male| 69|   New Mexico|  1.61|
|  6|Benjamin|    Lewis|  Male| 61|     Michigan|  1.93|
|  7|   Linda|    Baker|Female| 45|     New York|  1.74|
|  8| Carolyn|    Lewis|Female| 31|      Vermont|  1.61|
|  9|    Gary|     Wood|  Male| 70|      Wyoming|  1.83|
| 10|Jennifer|   Harris|Female| 30|        Maine|  1.74|
| 11| Michael|Gutierrez|  Male| 31|West Virginia|  1.53|
| 12|   Linda|    Patel|Female| 44|      Arizona|  1.51|
| 13| Jessica|  Jimenez|Female| 59| North Dakota|  1.78|
| 14| Jeffrey|   Martin|  Male|

In [416]:
# Loading a CSV with a header:
path = "created_dataset_csv"
df = (spark.read
       .option("sep", ",")
       .option("header", True)
       .csv(path)
        )

df.show()

+---+---------+---------+------+----+-------------+------+
| id|     name|  surname|   sex| age|     location|height|
+---+---------+---------+------+----+-------------+------+
|  1|    Debra|   Walker|Female|22.0|      Alabama|  1.67|
|  2|    Helen|  Jimenez|Female|19.0| Pennsylvania|   1.6|
|  3|   Andrew|   Miller|  Male|66.0|     Nebraska|  1.79|
|  4|  Kenneth|   Thomas|  Male|35.0|     New York|  1.68|
|  5|     John|   Miller|  Male|69.0|   New Mexico|  1.61|
|  6| Benjamin|    Lewis|  Male|61.0|     Michigan|  1.93|
|  7|    Linda|    Baker|Female|45.0|     New York|  1.74|
|  8|  Carolyn|    Lewis|Female|31.0|      Vermont|  1.61|
|  9|     Gary|     Wood|  Male|70.0|      Wyoming|  1.83|
| 10| Jennifer|   Harris|Female|30.0|        Maine|  1.74|
| 11|  Michael|Gutierrez|  Male|31.0|West Virginia|  1.53|
| 12|    Linda|    Patel|Female|44.0|      Arizona|  1.51|
| 13|  Jessica|  Jimenez|Female|59.0| North Dakota|  1.78|
| 14|  Jeffrey|   Martin|  Male|18.0|    Tennessee|  1.6

In [127]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- height: float (nullable = true)



## 8) Loading a CSV with schema using inferSchema

In [142]:
# Loading a CSV using inferSchema():

path = "created_dataset_csv"
df2 = (spark.read
     .format("csv")
     .option("sep", ",")
     .option("header", True)
     .option("inferSchema", True)
     .load(path)
    )

# Displaying the schema:
print(df_struct.printSchema())

# Displaying the DataFrame:
df2.show(10)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- height: float (nullable = true)

None
+---+--------+-------+------+---+------------+------+
| id|    name|surname|   sex|age|    location|height|
+---+--------+-------+------+---+------------+------+
|  1|   Debra| Walker|Female| 22|     Alabama|  1.67|
|  2|   Helen|Jimenez|Female| 19|Pennsylvania|   1.6|
|  3|  Andrew| Miller|  Male| 66|    Nebraska|  1.79|
|  4| Kenneth| Thomas|  Male| 35|    New York|  1.68|
|  5|    John| Miller|  Male| 69|  New Mexico|  1.61|
|  6|Benjamin|  Lewis|  Male| 61|    Michigan|  1.93|
|  7|   Linda|  Baker|Female| 45|    New York|  1.74|
|  8| Carolyn|  Lewis|Female| 31|     Vermont|  1.61|
|  9|    Gary|   Wood|  Male| 70|     Wyoming|  1.83|
| 10|Jennifer| Harris|Female| 30|       Maine|  1.74|
+---+--------+-------+------+

## 9) Load a CSV file with pre-defined schema using DDL string

In [136]:
# Defining a DDL schema (SQL-like schema):

ddl_schema = """
    id INT,
    name STRING,
    surname STRING,
    sex STRING,
    age INT,
    location STRING,
    height FLOAT
"""

path = "created_dataset_csv"

df_ddl = (spark.read
          .format("csv")
          .option("header", True)
          .option("sep", ",")
          .schema(ddl_schema)
          .load(path)
            )

print(df_struct.printSchema())
df_ddl.show(10)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- height: float (nullable = true)

None
+---+--------+-------+------+---+------------+------+
| id|    name|surname|   sex|age|    location|height|
+---+--------+-------+------+---+------------+------+
|  1|   Debra| Walker|Female| 22|     Alabama|  1.67|
|  2|   Helen|Jimenez|Female| 19|Pennsylvania|   1.6|
|  3|  Andrew| Miller|  Male| 66|    Nebraska|  1.79|
|  4| Kenneth| Thomas|  Male| 35|    New York|  1.68|
|  5|    John| Miller|  Male| 69|  New Mexico|  1.61|
|  6|Benjamin|  Lewis|  Male| 61|    Michigan|  1.93|
|  7|   Linda|  Baker|Female| 45|    New York|  1.74|
|  8| Carolyn|  Lewis|Female| 31|     Vermont|  1.61|
|  9|    Gary|   Wood|  Male| 70|     Wyoming|  1.83|
| 10|Jennifer| Harris|Female| 30|       Maine|  1.74|
+---+--------+-------+------+

## 10) Load a CSV file with predefined schema using StructType

In [228]:
# Loading dataset CSV file with pre-defined StructType:

struct_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("surname", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("location", StringType(), True),
    StructField("height", FloatType(), True),
    ])

path = "created_dataset_csv"

df_struct = (spark.read
          .format("csv")
          .option("header", True)
          .option("sep", ",")
          .schema(struct_schema)
          .load(path)
            )

print(df_struct.printSchema())
df_struct.show(10)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- height: float (nullable = true)

None
+---+--------+-------+------+---+------------+------+
| id|    name|surname|   sex|age|    location|height|
+---+--------+-------+------+---+------------+------+
|  1|   Debra| Walker|Female| 22|     Alabama|  1.67|
|  2|   Helen|Jimenez|Female| 19|Pennsylvania|   1.6|
|  3|  Andrew| Miller|  Male| 66|    Nebraska|  1.79|
|  4| Kenneth| Thomas|  Male| 35|    New York|  1.68|
|  5|    John| Miller|  Male| 69|  New Mexico|  1.61|
|  6|Benjamin|  Lewis|  Male| 61|    Michigan|  1.93|
|  7|   Linda|  Baker|Female| 45|    New York|  1.74|
|  8| Carolyn|  Lewis|Female| 31|     Vermont|  1.61|
|  9|    Gary|   Wood|  Male| 70|     Wyoming|  1.83|
| 10|Jennifer| Harris|Female| 30|       Maine|  1.74|
+---+--------+-------+------+

## 11) Load a DataFrame from parquet file and select only some columns

In [147]:
df2 = spark.read.parquet("created_dataset.parquet").select(["name","surname"])
df2.show(10)

+--------+-------+
|    name|surname|
+--------+-------+
|   Debra| Walker|
|   Helen|Jimenez|
|  Andrew| Miller|
| Kenneth| Thomas|
|    John| Miller|
|Benjamin|  Lewis|
|   Linda|  Baker|
| Carolyn|  Lewis|
|    Gary|   Wood|
|Jennifer| Harris|
+--------+-------+
only showing top 10 rows



## 12) Transformations

In [138]:
# Displaying the DataFrame before transformations:
df.show(10)

+---+--------+-------+------+---+------------+------+
| id|    name|surname|   sex|age|    location|height|
+---+--------+-------+------+---+------------+------+
|  1|   Debra| Walker|Female| 22|     Alabama|  1.67|
|  2|   Helen|Jimenez|Female| 19|Pennsylvania|   1.6|
|  3|  Andrew| Miller|  Male| 66|    Nebraska|  1.79|
|  4| Kenneth| Thomas|  Male| 35|    New York|  1.68|
|  5|    John| Miller|  Male| 69|  New Mexico|  1.61|
|  6|Benjamin|  Lewis|  Male| 61|    Michigan|  1.93|
|  7|   Linda|  Baker|Female| 45|    New York|  1.74|
|  8| Carolyn|  Lewis|Female| 31|     Vermont|  1.61|
|  9|    Gary|   Wood|  Male| 70|     Wyoming|  1.83|
| 10|Jennifer| Harris|Female| 30|       Maine|  1.74|
+---+--------+-------+------+---+------------+------+
only showing top 10 rows



### Selecting and dropping columns

In [213]:
# Selecting 3 columns from a DataFrame:
(df
 .select(["id","surname","age"])
 .show(10)
)

+---+-------+---+
| id|surname|age|
+---+-------+---+
|  1| Walker| 22|
|  2|Jimenez| 19|
|  3| Miller| 66|
|  4| Thomas| 35|
|  5| Miller| 69|
+---+-------+---+
only showing top 5 rows



In [221]:
# Selecting 5 columns from a DataFrame by dropping 2 columns:
(df
 .drop("id","location")
 .show(10)
)

+--------+-------+------+---+------+
|    name|surname|   sex|age|height|
+--------+-------+------+---+------+
|   Debra| Walker|Female| 22|  1.67|
|   Helen|Jimenez|Female| 19|   1.6|
|  Andrew| Miller|  Male| 66|  1.79|
| Kenneth| Thomas|  Male| 35|  1.68|
|    John| Miller|  Male| 69|  1.61|
|Benjamin|  Lewis|  Male| 61|  1.93|
|   Linda|  Baker|Female| 45|  1.74|
| Carolyn|  Lewis|Female| 31|  1.61|
|    Gary|   Wood|  Male| 70|  1.83|
|Jennifer| Harris|Female| 30|  1.74|
+--------+-------+------+---+------+
only showing top 10 rows



### Renaming columns

In [150]:
(df
 .withColumnRenamed("id", "customer_id")
 .withColumnRenamed("sex", "gender")
 .withColumnRenamed("location", "state")
 .show(10)
    )

+-----------+--------+-------+------+---+------------+------+
|customer_id|    name|surname|gender|age|       state|height|
+-----------+--------+-------+------+---+------------+------+
|          1|   Debra| Walker|Female| 22|     Alabama|  1.67|
|          2|   Helen|Jimenez|Female| 19|Pennsylvania|   1.6|
|          3|  Andrew| Miller|  Male| 66|    Nebraska|  1.79|
|          4| Kenneth| Thomas|  Male| 35|    New York|  1.68|
|          5|    John| Miller|  Male| 69|  New Mexico|  1.61|
|          6|Benjamin|  Lewis|  Male| 61|    Michigan|  1.93|
|          7|   Linda|  Baker|Female| 45|    New York|  1.74|
|          8| Carolyn|  Lewis|Female| 31|     Vermont|  1.61|
|          9|    Gary|   Wood|  Male| 70|     Wyoming|  1.83|
|         10|Jennifer| Harris|Female| 30|       Maine|  1.74|
+-----------+--------+-------+------+---+------------+------+
only showing top 10 rows



In [223]:
# Renaming columns by "select" + "alias":
(df
 .select(
     col("id").alias("customer_id"),
     col("sex").alias("gender"),
     col("location").alias("state")
     )
 .show(10)
    )

+-----------+------+------------+
|customer_id|gender|       state|
+-----------+------+------------+
|          1|Female|     Alabama|
|          2|Female|Pennsylvania|
|          3|  Male|    Nebraska|
|          4|  Male|    New York|
|          5|  Male|  New Mexico|
|          6|  Male|    Michigan|
|          7|Female|    New York|
|          8|Female|     Vermont|
|          9|  Male|     Wyoming|
|         10|Female|       Maine|
+-----------+------+------------+
only showing top 10 rows



### Splitting column content

In [210]:
from pyspark.sql.functions import split, length

(df
 .withColumn("height (parts)", split(col("height"), "\."))  # Dot is in regex as "\."
 .withColumn("height (m part)", split(col("height"), "\.").getItem(0))  # Using .getItem()
 .withColumn("height (cm part)", split(col("height"), "\.")[1])         # Using [x]
 .show(10)
    )

+---+--------+-------+------+---+------------+------+--------------+---------------+----------------+
| id|    name|surname|   sex|age|    location|height|height (parts)|height (m part)|height (cm part)|
+---+--------+-------+------+---+------------+------+--------------+---------------+----------------+
|  1|   Debra| Walker|Female| 22|     Alabama|  1.67|       [1, 67]|              1|              67|
|  2|   Helen|Jimenez|Female| 19|Pennsylvania|   1.6|        [1, 6]|              1|               6|
|  3|  Andrew| Miller|  Male| 66|    Nebraska|  1.79|       [1, 79]|              1|              79|
|  4| Kenneth| Thomas|  Male| 35|    New York|  1.68|       [1, 68]|              1|              68|
|  5|    John| Miller|  Male| 69|  New Mexico|  1.61|       [1, 61]|              1|              61|
|  6|Benjamin|  Lewis|  Male| 61|    Michigan|  1.93|       [1, 93]|              1|              93|
|  7|   Linda|  Baker|Female| 45|    New York|  1.74|       [1, 74]|              

### Filtering and sorting

In [299]:
# Filtering names with occurences equal to 3 or more:

from pyspark.sql.functions import count, avg, round, collect_list, sort_array, array_contains

df_filtered = (df
 .groupBy("name")
 .agg(
     count("name").alias("count"),
     round(avg("height"), 2).alias("average_height"),
     collect_list("location").alias("states")
     )
 .filter((col("count") >= 3) & (col("count") <= 4)) # Choosing name occurences >= 3 and <= 4
 .sort(col("count").desc())  # Sorting the list by descending occurences of names
 .withColumn("states", sort_array("states", asc=True))  # Sorting a "states" array
    )

df_filtered.show(truncate=False)  # "Truncate=False" to show all the locations
df_filtered.printSchema()

+---------+-----+--------------+-------------------------------------------------+
|name     |count|average_height|states                                           |
+---------+-----+--------------+-------------------------------------------------+
|Jack     |4    |1.58          |[Colorado, Massachusetts, New Hampshire, Wyoming]|
|Alexander|4    |1.7           |[Hawaii, Indiana, Michigan, New Jersey]          |
|Benjamin |3    |1.78          |[Kentucky, Michigan, Virginia]                   |
|Gary     |3    |1.76          |[Kentucky, Pennsylvania, Wyoming]                |
|Michelle |3    |1.87          |[Georgia, Nebraska, Tennessee]                   |
|Kevin    |3    |1.62          |[Arizona, Kentucky, New York]                    |
|Charles  |3    |1.78          |[Florida, Idaho, Montana]                        |
+---------+-----+--------------+-------------------------------------------------+

root
 |-- name: string (nullable = true)
 |-- count: long (nullable = false)
 |-- aver

In [304]:
# Changing a schema of a DataFrame to Nullable:

new_schema = StructType([
    StructField("name", StringType(), nullable=True),
    StructField("count", LongType(), nullable=True),
    StructField("average_height", DoubleType(), nullable=True),
    StructField("states", ArrayType(StringType()), nullable=True),
    ])

df_filtered = spark.createDataFrame(df_filtered.rdd, schema=new_schema)
df_filtered.printSchema()
df_filtered.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- count: long (nullable = true)
 |-- average_height: double (nullable = true)
 |-- states: array (nullable = true)
 |    |-- element: string (containsNull = true)

+---------+-----+--------------+-------------------------------------------------+
|name     |count|average_height|states                                           |
+---------+-----+--------------+-------------------------------------------------+
|Alexander|4    |1.7           |[Hawaii, Indiana, Michigan, New Jersey]          |
|Jack     |4    |1.58          |[Colorado, Massachusetts, New Hampshire, Wyoming]|
|Benjamin |3    |1.78          |[Kentucky, Michigan, Virginia]                   |
|Gary     |3    |1.76          |[Kentucky, Pennsylvania, Wyoming]                |
|Kevin    |3    |1.62          |[Arizona, Kentucky, New York]                    |
|Michelle |3    |1.87          |[Georgia, Nebraska, Tennessee]                   |
|Charles  |3    |1.78          |[Florida, Ida

In [256]:
# String match searching:

(df
 .filter(col("name").isin("John", "Jennifer", "Alexander", "Jack"))
 .sort("name", "age")
 .show()
    )

+---+---------+--------+------+----+-------------+------+
| id|     name| surname|   sex| age|     location|height|
+---+---------+--------+------+----+-------------+------+
| 56|Alexander|   White|  Male|28.0|      Indiana|  1.54|
| 20|Alexander|Castillo|  Male|31.0|   New Jersey|  1.54|
| 36|Alexander|  Torres|  Male|55.0|     Michigan|   2.0|
| 89|Alexander|   Scott|  Male|66.0|       Hawaii|  1.73|
| 39|     Jack|  Miller|  Male|32.0|New Hampshire|  1.57|
| 25|     Jack|  Watson|  Male|38.0|      Wyoming|  1.52|
| 31|     Jack|   Smith|  Male|56.0|Massachusetts|  1.57|
| 49|     Jack|Mitchell|  Male|66.0|     Colorado|  1.66|
| 10| Jennifer|  Harris|Female|30.0|        Maine|  1.74|
|  5|     John|  Miller|  Male|69.0|   New Mexico|  1.61|
+---+---------+--------+------+----+-------------+------+



### Exploding an array and filtering it:

In [305]:
from pyspark.sql.functions import count, avg, round, collect_list, sort_array, array_contains, explode

(df
 .groupBy("name")
 .agg(
     count("name").alias("count"),
     round(avg("height"), 2).alias("average_height"),
     collect_list("location").alias("states")
     )
 .filter(col("count") >= 3) # Choosing name occurences >= 3
 .select("name", "count", "average_height", explode("states").alias("states"))
 .sort(col("count").desc(), "name")  # Sorting the list by descending occurences of names
 .filter(
     ( col("states").contains("New") | col("states").endswith("a") | col("states").like("%in%") ) 
     & ( ~col("states").isin("Florida") )
    )  # Filtering the exploded array
 .show(truncate=False)  # "Truncate=False" to show all the locations
    )

# .filter(col("states").array_contains("New") | col("name").endswith("a") | col("states").like("%in%"))
# .withColumn("states", sort_array("states", asc=True))  # Sorting a "states" array

+---------+-----+--------------+-------------+
|name     |count|average_height|states       |
+---------+-----+--------------+-------------+
|Alexander|4    |1.7           |Indiana      |
|Alexander|4    |1.7           |New Jersey   |
|Jack     |4    |1.58          |Wyoming      |
|Jack     |4    |1.58          |New Hampshire|
|Benjamin |3    |1.78          |Virginia     |
|Charles  |3    |1.78          |Montana      |
|Gary     |3    |1.76          |Pennsylvania |
|Gary     |3    |1.76          |Wyoming      |
|Kevin    |3    |1.62          |Arizona      |
|Kevin    |3    |1.62          |New York     |
|Michelle |3    |1.87          |Georgia      |
|Michelle |3    |1.87          |Nebraska     |
+---------+-----+--------------+-------------+



In [294]:
# Directly filtering an array without exploding it:

from pyspark.sql.functions import count, avg, round, collect_list, sort_array, array_contains, expr

(df
 .groupBy("name")
 .agg(
     count("name").alias("count"),
     round(avg("height"), 2).alias("average_height"),
     collect_list("location").alias("states")
     )
 .filter(col("count") >= 3) # Choosing name occurences >= 3
 .withColumn("states", expr("filter(states, state -> NOT state like '%in%')"))  # Filtering the array
 .sort(col("count").desc(), "name")  # Sorting the list by descending occurences of names
 .show(truncate=False)  # "Truncate=False" to show all the locations
    )

+---------+-----+--------------+----------------------------------------+
|name     |count|average_height|states                                  |
+---------+-----+--------------+----------------------------------------+
|Alexander|4    |1.7           |[New Jersey, Michigan, Indiana, Hawaii] |
|Jack     |4    |1.58          |[Massachusetts, New Hampshire, Colorado]|
|Benjamin |3    |1.78          |[Michigan, Kentucky]                    |
|Charles  |3    |1.78          |[Florida, Montana, Idaho]               |
|Gary     |3    |1.76          |[Pennsylvania, Kentucky]                |
|Kevin    |3    |1.62          |[Arizona, New York, Kentucky]           |
|Michelle |3    |1.87          |[Georgia, Tennessee, Nebraska]          |
+---------+-----+--------------+----------------------------------------+



### Mathematical transformations

In [None]:
df = df.withColumn("height_sqrt", sqrt(col("height")))
df.show()

### Changing column data types

In [230]:
df.select(col("age").cast("float")).printSchema()

root
 |-- age: float (nullable = true)



### Using **SQL** to transform DataFrame

In [32]:
# Creating a SQL table (as a view):
df.createOrReplaceTempView("dataset")

# Reading the created SQL table:
spark.sql('''

  SELECT
     id
    ,name
    ,surname
    ,age
    ,location as state
  FROM dataset
  WHERE 1=1
    AND (location like "%New%") 
    AND (age between 20 and 35)
    
    ''').show()

+---+---------+--------+---+-------------+
| id|     name| surname|age|        state|
+---+---------+--------+---+-------------+
|  4|  Kenneth|  Thomas| 35|     New York|
| 20|Alexander|Castillo| 31|   New Jersey|
| 39|     Jack|  Miller| 32|New Hampshire|
| 40|    Kevin|    Hall| 23|     New York|
| 71|    Susan|Campbell| 20|New Hampshire|
+---+---------+--------+---+-------------+



### Joining DataFrames

In [344]:
# Creating DataFrames for Joining:

df_1 = df.select("id","name","surname")
df_2 = df.select("id","sex","age","height")

df_1.show(5)
df_2.show(5)

+---+-------+-------+
| id|   name|surname|
+---+-------+-------+
|  1|  Debra| Walker|
|  2|  Helen|Jimenez|
|  3| Andrew| Miller|
|  4|Kenneth| Thomas|
|  5|   John| Miller|
+---+-------+-------+
only showing top 5 rows

+---+------+----+------+
| id|   sex| age|height|
+---+------+----+------+
|  1|Female|22.0|  1.67|
|  2|Female|19.0|   1.6|
|  3|  Male|66.0|  1.79|
|  4|  Male|35.0|  1.68|
|  5|  Male|69.0|  1.61|
+---+------+----+------+
only showing top 5 rows



In [313]:
# Inner join:
df_1.join(df_2, on="id", how="inner").show(10)

+---+--------+-------+------+----+------+
| id|    name|surname|   sex| age|height|
+---+--------+-------+------+----+------+
|  1|   Debra| Walker|Female|22.0|  1.67|
|  2|   Helen|Jimenez|Female|19.0|   1.6|
|  3|  Andrew| Miller|  Male|66.0|  1.79|
|  4| Kenneth| Thomas|  Male|35.0|  1.68|
|  5|    John| Miller|  Male|69.0|  1.61|
|  6|Benjamin|  Lewis|  Male|61.0|  1.93|
|  7|   Linda|  Baker|Female|45.0|  1.74|
|  8| Carolyn|  Lewis|Female|31.0|  1.61|
|  9|    Gary|   Wood|  Male|70.0|  1.83|
| 10|Jennifer| Harris|Female|30.0|  1.74|
+---+--------+-------+------+----+------+
only showing top 10 rows



In [322]:
# Cross join:
df_1.crossJoin(df_2).show(10)

+---+-----+-------+---+------+----+------+
| id| name|surname| id|   sex| age|height|
+---+-----+-------+---+------+----+------+
|  1|Debra| Walker|  1|Female|22.0|  1.67|
|  1|Debra| Walker|  2|Female|19.0|   1.6|
|  1|Debra| Walker|  3|  Male|66.0|  1.79|
|  1|Debra| Walker|  4|  Male|35.0|  1.68|
|  1|Debra| Walker|  5|  Male|69.0|  1.61|
|  1|Debra| Walker|  6|  Male|61.0|  1.93|
|  1|Debra| Walker|  7|Female|45.0|  1.74|
|  1|Debra| Walker|  8|Female|31.0|  1.61|
|  1|Debra| Walker|  9|  Male|70.0|  1.83|
|  1|Debra| Walker| 10|Female|30.0|  1.74|
+---+-----+-------+---+------+----+------+
only showing top 10 rows



In [320]:
# More complex Cross join:
((df_1.filter(col("id") <= 3))
 .crossJoin(df_2.filter(col("id") <= 3))
 .show(20)
    )

+---+------+-------+---+------+----+------+
| id|  name|surname| id|   sex| age|height|
+---+------+-------+---+------+----+------+
|  1| Debra| Walker|  1|Female|22.0|  1.67|
|  2| Helen|Jimenez|  1|Female|22.0|  1.67|
|  3|Andrew| Miller|  1|Female|22.0|  1.67|
|  1| Debra| Walker|  2|Female|19.0|   1.6|
|  2| Helen|Jimenez|  2|Female|19.0|   1.6|
|  3|Andrew| Miller|  2|Female|19.0|   1.6|
|  1| Debra| Walker|  3|  Male|66.0|  1.79|
|  2| Helen|Jimenez|  3|  Male|66.0|  1.79|
|  3|Andrew| Miller|  3|  Male|66.0|  1.79|
+---+------+-------+---+------+----+------+



## 13) Caching

- paměti pro driver/executor podle velikosti dat
- exekuční plán - explain()
- sparksession time - start.time, 
- broadcasting a small dataset

### Caching a DataFrame

In [323]:
# Caching the DataFrame to a memory:
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_ONLY).count()   # count() action is used to trigger the caching

100

In [324]:
print(df.storageLevel)

Memory Serialized 1x Replicated


In [None]:
spark.sql.shuffle.partitions
spark.sql.autoBroadcastJoinThreshold
counter = spark.sparkContext.accumulator(0)

### Broadcasting a DataFrame in a join:

In [25]:
# Creating DataFrames for broadcasting:

from pyspark.sql.functions import substring, col


path = "created_dataset_csv"
df_large = (spark.read
       .option("sep", ",")
       .option("header", True)
       .csv(path)
        )

df_small = (df_large
 .withColumn("name", substring(col("name"), 0, 3))
 .withColumn("surname_abbr", substring(col("surname"), 0, 3))
 .select("id","name","surname_abbr")
    )

df_small.show(5)
df_large.show(5)

+---+----+------------+
| id|name|surname_abbr|
+---+----+------------+
|  1| Deb|         Wal|
|  2| Hel|         Jim|
|  3| And|         Mil|
|  4| Ken|         Tho|
|  5| Joh|         Mil|
+---+----+------------+
only showing top 5 rows

+---+-------+-------+------+----+------------+------+
| id|   name|surname|   sex| age|    location|height|
+---+-------+-------+------+----+------------+------+
|  1|  Debra| Walker|Female|22.0|     Alabama|  1.67|
|  2|  Helen|Jimenez|Female|19.0|Pennsylvania|   1.6|
|  3| Andrew| Miller|  Male|66.0|    Nebraska|  1.79|
|  4|Kenneth| Thomas|  Male|35.0|    New York|  1.68|
|  5|   John| Miller|  Male|69.0|  New Mexico|  1.61|
+---+-------+-------+------+----+------------+------+
only showing top 5 rows



In [501]:
df_large.rdd.getNumPartitions()

1

In [21]:
# Repartitioning larger DataFrame to more partitions than 1:
df_large = df_large.repartition(4)

# Displaying the number of partitions:
df_large.rdd.getNumPartitions()

4

In [26]:
# Performing a join with broadcasting:
from pyspark.sql.functions import broadcast

df_b = df_large.join(broadcast(df_small), on="id", how="inner")
#df_b = df_large.join(broadcast(df_small), df_large.id==df_small.id, how="inner")

print(df_b.show(5))
print("Number of partitions:", df.rdd.getNumPartitions())

# Deleting cached copies of df_1 DataFrame:
# df_1.unpersist()

+---+-------+-------+------+----+------------+------+----+------------+
| id|   name|surname|   sex| age|    location|height|name|surname_abbr|
+---+-------+-------+------+----+------------+------+----+------------+
|  1|  Debra| Walker|Female|22.0|     Alabama|  1.67| Deb|         Wal|
|  2|  Helen|Jimenez|Female|19.0|Pennsylvania|   1.6| Hel|         Jim|
|  3| Andrew| Miller|  Male|66.0|    Nebraska|  1.79| And|         Mil|
|  4|Kenneth| Thomas|  Male|35.0|    New York|  1.68| Ken|         Tho|
|  5|   John| Miller|  Male|69.0|  New Mexico|  1.61| Joh|         Mil|
+---+-------+-------+------+----+------------+------+----+------------+
only showing top 5 rows

None
Number of partitions: 1


In [40]:
df_b.columns[1]

'name'

In [37]:
df_b.select(df_b.columns[2:3]).show()

+---------+
|  surname|
+---------+
|   Walker|
|  Jimenez|
|   Miller|
|   Thomas|
|   Miller|
|    Lewis|
|    Baker|
|    Lewis|
|     Wood|
|   Harris|
|Gutierrez|
|    Patel|
|  Jimenez|
|   Martin|
|    Ortiz|
|    Reyes|
|    Green|
|      Lee|
|  Mendoza|
| Castillo|
+---------+
only showing top 20 rows



In [504]:
# Displaying the execution plan:
df_large.join(broadcast(df_small), on="id", how="inner").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#19036, name#19037, surname#19038, sex#19039, age#19040, location#19041, height#19042, name_abbr#19051, surname_abbr#19062]
   +- BroadcastHashJoin [id#19036], [id#19205], Inner, BuildRight, false
      :- Exchange RoundRobinPartitioning(4), REPARTITION_BY_NUM, [id=#13806]
      :  +- Filter isnotnull(id#19036)
      :     +- FileScan csv [id#19036,name#19037,surname#19038,sex#19039,age#19040,location#19041,height#19042] Batched: false, DataFilters: [isnotnull(id#19036)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/jakub.cajzl/created_dataset.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:string,name:string,surname:string,sex:string,age:string,location:string,height:string>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#13809]
         +- Project [id#19205, substring(name#19206, 0, 3) AS name_abbr#19051, substring(su

### explain() command

In [140]:
df.select(col("id").alias("cutomer_id")).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange SinglePartition, REPARTITION_BY_NUM, [id=#1684]
   +- Project [id#0 AS cutomer_id#1585]
      +- FileScan parquet [id#0] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jakub.cajzl/created_dataset.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>


