In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


In [11]:
!pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.0
      /_/
                        
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 11.0.15
Branch HEAD
Compiled by user ubuntu on 2023-09-09T01:53:20Z
Revision ce5ddad990373636e94071e7cef2f31021add07b
Url https://github.com/apache/spark
Type --help for more information.


In [3]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("PySpark Data Analysis") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()


24/02/18 14:44:16 WARN Utils: Your hostname, Pratiks-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.43 instead (on interface en0)
24/02/18 14:44:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/18 14:44:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Load your dataset (replace 'your_dataset.csv' with your actual dataset file)
data = spark.read.csv('sample_data_test.csv', header=True, inferSchema=True)


                                                                                

In [5]:
# Display the schema of the dataset
data.printSchema()

# Show the first few rows of the dataset
data.show(5)

root
 |-- Index: integer (nullable = true)
 |-- Customer Id: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Phone 1: string (nullable = true)
 |-- Phone 2: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Subscription Date: date (nullable = true)
 |-- Website: string (nullable = true)

+-----+---------------+----------+---------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+
|Index|    Customer Id|First Name|Last Name|             Company|            City|             Country|             Phone 1|             Phone 2|               Email|Subscription Date|             Website|
+-----+---------------+----------+---------+--------------------+----------------+--------------------+

In [6]:
# Count the number of rows in the dataset
print("Total number of rows:", data.count())


[Stage 3:>                                                          (0 + 8) / 8]

Total number of rows: 2000000


                                                                                

In [7]:
cleaned_data = data.dropna()

# Count the number of rows after cleaning
print("Total number of rows after cleaning:", cleaned_data.count())




Total number of rows after cleaning: 2000000


                                                                                

In [8]:
# Wide transformation example
wide_df = data.groupBy("Country").count()
print("Wide Transformation Result:")
wide_df.show()

Wide Transformation Result:


[Stage 9:>                                                          (0 + 8) / 8]

+--------------------+-----+
|             Country|count|
+--------------------+-----+
|                Chad| 8134|
|            Anguilla| 8128|
|            Paraguay| 8130|
|               Macao| 8247|
|Heard Island and ...| 8082|
|               Yemen| 8058|
|             Senegal| 8181|
|             Tokelau| 8196|
|              Sweden| 8110|
|            Kiribati| 8210|
|French Southern T...| 8091|
|              Guyana| 8068|
|         Philippines| 8227|
|             Eritrea| 7974|
|              Jersey| 8011|
|            Djibouti| 8286|
|               Tonga| 8160|
|      Norfolk Island| 8113|
|            Malaysia| 8166|
|           Singapore| 8143|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [9]:
# Narrow transformation example
narrow_df = data.filter(data["Country"] == "Costa Rica").select("First Name", "Last Name")
# Show results
print("Narrow Transformation Result:")
narrow_df.show()

Narrow Transformation Result:
+----------+---------+
|First Name|Last Name|
+----------+---------+
|      Gina|    Rocha|
|  Margaret|   Rogers|
|     Tanya| Franklin|
|     Steve|Valentine|
|     Tracy|   Briggs|
|  Benjamin|     Gray|
|    Colton| Mcintyre|
|      Gail|      Ray|
|    Brandi|Contreras|
|      Troy|  Sellers|
|     Robin|    Black|
|     Nancy|    Brown|
|    Ernest| Mcmillan|
|   Jasmine|   Norman|
|     Sonya|   Turner|
|     Jorge|    Mejia|
|     Chris|   Walter|
|     Penny|  Carlson|
|       Jay|   Booker|
|     David|  Lindsey|
+----------+---------+
only showing top 20 rows



### Repartition and Coalesce

In [12]:
# Get the number of partitions
num_partitions = data.rdd.getNumPartitions()
print("Number of partitions:", num_partitions)

Number of partitions: 8


In [16]:
# Assuming 'data' is your DataFrame
data = data.repartition(10)  # Repartitioning to 10 partitions

# Get the number of partitions after repartitioning
num_partitions_after_repartition = data.rdd.getNumPartitions()
print("Number of partitions after repartitioning:", num_partitions_after_repartition)

[Stage 14:>                                                         (0 + 8) / 8]

Number of partitions after repartitioning: 10




In [17]:
# Coalesce data into 5 partitions
data = data.coalesce(5)
# Get the number of partitions after repartitioning
num_partitions_after_coalesce = data.rdd.getNumPartitions()
print("Number of partitions after coalesce:", num_partitions_after_coalesce)

[Stage 15:>                                                         (0 + 8) / 8]

Number of partitions after coalesce: 5




### Persistence and Caching

In [33]:
# Cache DataFrame
# cache automatically set storage to memory only 
data.cache()

24/02/18 13:52:16 WARN CacheManager: Asked to cache already cached data.


DataFrame[Index: int, Customer Id: string, First Name: string, Last Name: string, Company: string, City: string, Country: string, Phone 1: string, Phone 2: string, Email: string, Subscription Date: date, Website: string]

In [35]:
# Example actions to trigger caching
# Count action
count = data.count()
print("Count:", count)

# Show action
data.show()

Count: 2000000
+------+---------------+----------+----------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+
| Index|    Customer Id|First Name| Last Name|             Company|           City|             Country|             Phone 1|             Phone 2|               Email|Subscription Date|             Website|
+------+---------------+----------+----------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+
| 20198|2240834Aa9c96aB|  Terrance|    Parker|Montoya, Mckenzie...| Robertsonmouth|               Haiti| +1-748-802-4792x351|001-835-071-8076x100|xellis@booth-tuck...|       2020-09-09| https://stokes.com/|
|180599|dDF06AdcCbA9Fc4|   Allison|     Berry|         Koch-Pruitt|    Hooperburgh|        Cook Islands|001-518-005-1460x297| +1-846-115-0465x242|daughertyla

In [32]:
from pyspark import StorageLevel
# storage levels MEMORY_ONLY,MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2,MEMORY_AND_DISK_2
data.persist(StorageLevel.MEMORY_ONLY)

24/02/18 13:51:25 WARN CacheManager: Asked to cache already cached data.


DataFrame[Index: int, Customer Id: string, First Name: string, Last Name: string, Company: string, City: string, Country: string, Phone 1: string, Phone 2: string, Email: string, Subscription Date: date, Website: string]

In [36]:
# RDD Unpersist
# PySpark automatically monitors every persist() and cache() calls you make and it checks usage on each node and drops persisted data if not used or by using least-recently-used (LRU) algorithm
data.unpersist()

DataFrame[Index: int, Customer Id: string, First Name: string, Last Name: string, Company: string, City: string, Country: string, Phone 1: string, Phone 2: string, Email: string, Subscription Date: date, Website: string]

### Broadcast Variables and Accumulator Variables

In [6]:
# Create a broadcast variable
broadcast_var = spark.sparkContext.broadcast({"USA": "United States", "UK": "United Kingdom"})
# Create an accumulator variable
accum = spark.sparkContext.accumulator(0)



In [7]:
# Create an accumulator variable
accum = spark.sparkContext.accumulator(0)

# Map function to count USA records and update accumulator
# Need to increase executor and driver memory to compute large data accumulator
def count_usa(record):
    global accum
    if record["Country"] == "Romania":
        accum += 1
    return record

# Apply map function to count USA records
usa_records = data.rdd.map(count_usa)

# Perform an action to trigger the map transformation
usa_records.collect()

# Get the value of the accumulator
print("Number of Romania records:", accum.value)


                                                                                

Number of Romania records: 8137


In [10]:
# Create a broadcast variable
broadcast_var = spark.sparkContext.broadcast({"USA": "United States", "UK": "United Kingdom"})

# Use broadcast variable to convert country names
def convert_country_name(record):
    global broadcast_var
    country = record["Country"]
    if country in broadcast_var.value:
        record["Country"] = broadcast_var.value[country]
    return record

# Apply map function to convert country names
converted_df = data.rdd.map(convert_country_name).toDF()

# Show the DataFrame with converted country names
converted_df.show(5)

+-----+---------------+----------+---------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+
|Index|    Customer Id|First Name|Last Name|             Company|            City|             Country|             Phone 1|             Phone 2|               Email|Subscription Date|             Website|
+-----+---------------+----------+---------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+
|    1|4962fdbE6Bfee6D|       Pam|   Sparks|        Patel-Deleon|      Blakemouth|British Indian Oc...|    267-243-9490x035|    480-078-0535x889|nicolas00@faulkne...|       2020-11-29| https://nelson.com/|
|    2|9b12Ae76fdBc9bE|      Gina|    Rocha|Acosta, Paul and ...|East Lynnchester|          Costa Rica|        027.142.0940|+1-752-593-4777x0...|  yfarley@morgan.com|       202

### PySpark UDF

In [17]:
# Define a Python function to change first name and last name to lowercase
def lower_case_names(first_name, last_name):
    return f"{first_name.lower()} {last_name.lower()}"

# Register the Python function as a UDF
lower_case_names_udf = udf(lower_case_names, StringType())

# Update the "First Name" and "Last Name" columns with lowercase values
df = data.withColumn("First Name", lower(data["First Name"]))
df = df.withColumn("Last Name", lower(df["Last Name"]))

# Show the result
df.show()


+-----+---------------+----------+---------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+
|Index|    Customer Id|First Name|Last Name|             Company|             City|             Country|             Phone 1|             Phone 2|               Email|Subscription Date|             Website|
+-----+---------------+----------+---------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+
|    1|4962fdbE6Bfee6D|       pam|   sparks|        Patel-Deleon|       Blakemouth|British Indian Oc...|    267-243-9490x035|    480-078-0535x889|nicolas00@faulkne...|       2020-11-29| https://nelson.com/|
|    2|9b12Ae76fdBc9bE|      gina|    rocha|Acosta, Paul and ...| East Lynnchester|          Costa Rica|        027.142.0940|+1-752-593-4777x0...|  yfarley@morgan.com|     

24/02/18 17:44:13 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 317972 ms exceeds timeout 120000 ms
24/02/18 17:44:14 WARN SparkContext: Killing executors is not supported by current scheduler.
24/02/18 17:46:52 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$