# PySpark Tutorial

## 1. Creating Spark Session and Spark Context
In PySpark, SparkContext is the entry point to any Spark functionality. It allows your Spark application to access the cluster through a resource manager. In more recent versions of PySpark, you typically use SparkSession, which includes SparkContext and provides additional functionality.

In [0]:
# Import necessary libraries
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

# # Initialize SparkContext
# conf = SparkConf().setAppName("MyApp").setMaster("local[*]")
# sc = SparkContext(conf=conf)

# .config(conf=sc.getConf()) \
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MyApp") \
    .getOrCreate()


###2.Creating Different Spark Sessions

In [0]:
from pyspark.sql import SparkSession

# Default Spark session provided by Databricks
print("Default SparkSession")
print(spark)

# Create a new Spark session
new_spark1 = SparkSession.builder \
    .appName("exampleApp1") \
    .getOrCreate()

print("New SparkSession 1")
print(new_spark1)

# Create another new Spark session
new_spark2 = SparkSession.builder \
    .appName("exampleApp2") \
    .getOrCreate()

print("New SparkSession 2")
print(new_spark2)

# Example usage of the new Spark sessions
data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)]
columns = ["Name", "Value"]

df1 = new_spark1.createDataFrame(data, columns)
df2 = new_spark2.createDataFrame(data, columns)

df1.show()
df2.show()



Default SparkSession
<pyspark.sql.session.SparkSession object at 0x7f470ed58490>
New SparkSession 1
<pyspark.sql.session.SparkSession object at 0x7f470ed58490>
New SparkSession 2
<pyspark.sql.session.SparkSession object at 0x7f470ed58490>
+-----+-----+
| Name|Value|
+-----+-----+
|Alice|    1|
|  Bob|    2|
|Cathy|    3|
+-----+-----+

+-----+-----+
| Name|Value|
+-----+-----+
|Alice|    1|
|  Bob|    2|
|Cathy|    3|
+-----+-----+



## 3. Creating RDDs
Resilient Distributed Datasets (RDDs) are the fundamental data structure of Spark. They are immutable and distributed collections of objects. RDDs can be created in multiple ways:

### 3.1 Parallelizing an Existing Collection
This method is useful for small datasets that can fit into memory.

In [0]:
# Create an RDD by parallelizing a collection
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

# Perform an action to collect the results
print(rdd.collect())  # Output: [1, 2, 3, 4, 5]

[1, 2, 3, 4, 5]


### 3.2 Reading from a Text File
You can read data from a text file stored locally or in a distributed file system like HDFS.

In [0]:
# Create an RDD by reading a text file
rdd = sc.textFile("dbfs:/FileStore/error_log.txt")

# Perform an action to collect the results
print(rdd.collect())

['24-06-2024 12:20:13: System.Configuration.ConfigurationErrorsException: PythonScriptFilePath is not configured.', '   at DE_IDENTIFICATION_TOOL.Pythonresponse.PythonScriptFilePath.FindProjectRootDirectory()']


### 3.3 Reading from Multiple Text Files
You can read data from multiple text files located in a directory.

In [0]:
# Create an RDD by reading multiple text files
rdd = sc.textFile("dbfs:/FileStore/TextFiles/*")

# Perform an action to collect the results
print(rdd.collect())

['File1 text', 'File2 Text', 'File3 Text']


### 3.4 Reading Whole Text Files
This method reads the entire contents of each file as a single record.

In [0]:
# Create an RDD by reading whole text files
rdd = sc.wholeTextFiles("dbfs:/FileStore/*")

# Perform an action to collect the results
print(rdd.collect())

[('dbfs:/FileStore/Financial_Sample.xlsx', 'PK\x03\x04\x14\x00\x06\x00\x08\x00\x00\x00!\x00\x1f�\x16`�\x01\x00\x00\x11\x08\x00\x00\x13\x00\x08\x02[Content_Types].xml �\x04\x02(�\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0

### 3.5 Reading from HDFS
If your data is stored in HDFS, you can read it directly into an RDD.

In [0]:
# # Create an RDD by reading from HDFS
# rdd = sc.textFile("hdfs:///path/to/file.txt")

# # Perform an action to collect the results
# print(rdd.collect())

## 4. Reading Data into DataFrames
DataFrames are a distributed collection of data organized into named columns, conceptually equivalent to a table in a relational database or a data frame in R/Python. They provide a higher-level abstraction than RDDs and support more operations.

####Reading CSV Files into DF

In [0]:
# Initialize SparkSession
# spark = SparkSession.builder.appName("ReadDataFrame").getOrCreate()

# Read data from a CSV file into a DataFrame
df = spark.read.csv("dbfs:/FileStore/Product.csv", header=True, inferSchema=True,sep='\t')

# Show the first few rows of the DataFrame
df.show()

+----------+--------------------+-------------+-----+-----------+-----------+-----------------------+-----------------+
|ProductKey|             Product|Standard Cost|Color|Subcategory|   Category|Background Color Format|Font Color Format|
+----------+--------------------+-------------+-----+-----------+-----------+-----------------------+-----------------+
|       210|HL Road Frame - B...|      $868.63|Black|Road Frames| Components|                #000000|          #FFFFFF|
|       215|Sport-100 Helmet,...|       $12.03|Black|    Helmets|Accessories|                #000000|          #FFFFFF|
|       216|Sport-100 Helmet,...|       $13.88|Black|    Helmets|Accessories|                #000000|          #FFFFFF|
|       217|Sport-100 Helmet,...|       $13.09|Black|    Helmets|Accessories|                #000000|          #FFFFFF|
|       253|LL Road Frame - B...|       $176.2|Black|Road Frames| Components|                #000000|          #FFFFFF|
|       254|LL Road Frame - B...|      $

####Reading Parquet Files into DF

In [0]:

# Read a Parquet file into a DataFrame
df = spark.read.parquet("dbfs:/FileStore/tables/MT_cars.parquet")

# Show the DataFrame
df.show()

# Stop the SparkSession
spark.stop()


+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|              model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|           Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|           Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|
|          M

Custom TB Handler failed, unregistering


## 5. Spark APIs
Spark provides various APIs for different types of processing. Some of the most commonly used ones are:

### 5.1 Spark SQL
Spark SQL is used for structured data processing. You can use SQL queries to interact with DataFrames and Datasets.

In [0]:
# Import necessary libraries
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

# # Initialize SparkContext

spark = SparkSession.builder \
    .appName("MyApp1") \
    .getOrCreate()

df = spark.read.csv("dbfs:/FileStore/Product.csv", header=True, inferSchema=True,sep='\t')


-------------------------------------------
Time: 2024-07-13 10:55:43
-------------------------------------------

-------------------------------------------
Time: 2024-07-13 10:55:44
-------------------------------------------



In [0]:
# Register the DataFrame as a SQL temporary view

df.createOrReplaceTempView("table")

# Run an SQL query
result = spark.sql("SELECT * FROM table")
result.show()

-------------------------------------------
Time: 2024-07-13 10:55:52
-------------------------------------------

-------------------------------------------
Time: 2024-07-13 10:55:53
-------------------------------------------

+----------+--------------------+-------------+-----+-----------+-----------+-----------------------+-----------------+
|ProductKey|             Product|Standard Cost|Color|Subcategory|   Category|Background Color Format|Font Color Format|
+----------+--------------------+-------------+-----+-----------+-----------+-----------------------+-----------------+
|       210|HL Road Frame - B...|      $868.63|Black|Road Frames| Components|                #000000|          #FFFFFF|
|       215|Sport-100 Helmet,...|       $12.03|Black|    Helmets|Accessories|                #000000|          #FFFFFF|
|       216|Sport-100 Helmet,...|       $13.88|Black|    Helmets|Accessories|                #000000|          #FFFFFF|
|       217|Sport-100 Helmet,...|       $13.09|Bla

### 5.2 Spark Pandas
Pandas API on Spark provides the pandas-like API which operates on Spark dataframes.

In [0]:
# Import necessary libraries
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

# # Initialize SparkContext

spark = SparkSession.builder \
    .appName("MyApp1") \
    .getOrCreate()

df = spark.read.csv("dbfs:/FileStore/Product.csv", header=True, inferSchema=True,sep='\t')


In [0]:
# Import pandas on Spark
import pyspark.pandas as ps

# Create a pandas on Spark DataFrame
pdf = ps.DataFrame(df)
pdf.head()

Unnamed: 0,ProductKey,Product,Standard Cost,Color,Subcategory,Category,Background Color Format,Font Color Format
0,210,"HL Road Frame - Black, 58",$868.63,Black,Road Frames,Components,#000000,#FFFFFF
1,215,"Sport-100 Helmet, Black",$12.03,Black,Helmets,Accessories,#000000,#FFFFFF
2,216,"Sport-100 Helmet, Black",$13.88,Black,Helmets,Accessories,#000000,#FFFFFF
3,217,"Sport-100 Helmet, Black",$13.09,Black,Helmets,Accessories,#000000,#FFFFFF
4,253,"LL Road Frame - Black, 58",$176.2,Black,Road Frames,Components,#000000,#FFFFFF


### 5.3 Spark Streaming
Spark Streaming is used for processing real-time streaming data. It allows you to ingest data from various sources like Kafka, Flume, Kinesis, etc., and perform complex operations on it.

In [0]:
# Databricks notebook source

# Import necessary libraries
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
import os
from datetime import datetime

# Use the existing Spark session
spark = SparkSession.builder.appName("StreamingWordCount").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 1)  # 1 second batch interval

# Create a DStream that connects to hostname:port
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Function to save RDD to DBFS
def save_to_dbfs(rdd):
    if not rdd.isEmpty():
        df = rdd.toDF(["word", "count"])
        timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
        output_path = f"dbfs:/FileStore/tables/wordCounts_{timestamp}.csv"
        df.write.csv(output_path)

# Apply the function to each RDD
wordCounts.foreachRDD(save_to_dbfs)

# Start the computation
# ssc.stop(stopSparkContext=False, stopGraceFully=True)
ssc.start()

# Wait for the computation to terminate
ssc.awaitTermination()


## 6. RDDs vs DataFrames
RDDs and DataFrames are two types of data structures available in Spark. Here are some key differences:

- **Abstraction Level**: RDDs are low-level, providing more control over data. DataFrames are high-level, providing more convenience.
- **Optimization**: DataFrames have built-in optimizations and use the Catalyst optimizer. RDDs require manual optimization.
- **API**: RDDs use functional programming APIs, while DataFrames use declarative APIs similar to SQL.

## 7. Converting RDD to DataFrame
You can convert an RDD to a DataFrame using `toDF` method.

In [0]:
# Example of converting RDD to DataFrame
from pyspark.sql import Row

# Create an RDD
rdd = sc.parallelize([Row(name="Alice", age=5), Row(name="Bob", age=10)])

# Convert RDD to DataFrame
df = rdd.toDF()
df.show()

+-----+---+
| name|age|
+-----+---+
|Alice|  5|
|  Bob| 10|
+-----+---+



## 8. Different Data Sources Available
Spark can read data from various sources including:
- CSV
- JSON
- Parquet
- ORC
- Avro
- JDBC
- HDFS
- S3
- HBase
- Kafka


## 9. collect
`collect` is an action that retrieves the entire dataset from the RDD/DataFrame to the driver node.

In [0]:
# Example of collect
rdd = sc.parallelize([1, 2, 3, 4, 5])
result = rdd.collect()
print(result)  # Output: [1, 2, 3, 4, 5]

[1, 2, 3, 4, 5]


## 10. withColumn, and withColumnRenamed
`withColumn` is used to create a new column or replace an existing column in a DataFrame. `withColumnRenamed` is used to rename an existing column.

In [0]:
# Example of withColumn
from pyspark.sql.functions import col

df = df.withColumn("new_column", col("name") + 1)
df.show()

# Example of withColumnRenamed
df = df.withColumnRenamed("new_column", "new_name")
df.show()

+-----+---+----------+
| name|age|new_column|
+-----+---+----------+
|Alice|  5|      null|
|  Bob| 10|      null|
+-----+---+----------+

+-----+---+--------+
| name|age|new_name|
+-----+---+--------+
|Alice|  5|    null|
|  Bob| 10|    null|
+-----+---+--------+



## 11. where, and filter
Both `where` and `filter` can be used to filter rows in a DataFrame based on a condition.

In [0]:
# Example of where and filter
df.where(col("age") > 10).show()
df.filter(col("age") > 10).show()

+----+---+--------+
|name|age|new_name|
+----+---+--------+
+----+---+--------+

+----+---+--------+
|name|age|new_name|
+----+---+--------+
+----+---+--------+



## 12. orderBy, sort, groupBy
`orderBy` and `sort` are used to sort the DataFrame based on one or more columns. `groupBy` is used to group rows that have the same values in specified columns.

In [0]:
# Example of orderBy and sort
df.orderBy("age").show()
df.sort("age").show()

# Example of groupBy
df.groupBy("age").count().show()

+-----+---+--------+
| name|age|new_name|
+-----+---+--------+
|Alice|  5|    null|
|  Bob| 10|    null|
+-----+---+--------+

+-----+---+--------+
| name|age|new_name|
+-----+---+--------+
|Alice|  5|    null|
|  Bob| 10|    null|
+-----+---+--------+

+---+-----+
|age|count|
+---+-----+
|  5|    1|
| 10|    1|
+---+-----+



## 13. drop, and dropDuplicates
`drop` is used to remove a column from the DataFrame. `dropDuplicates` is used to remove duplicate rows from the DataFrame.

In [0]:
# Example of drop
df = df.drop("column_name")
df.show()

# Example of dropDuplicates
df = df.dropDuplicates()
df.show()

## 14. union, unionAll, unionByName
`union` is used to combine two DataFrames with the same schema. `unionAll` is deprecated and replaced by `union`. `unionByName` is used to combine two DataFrames with matching column names.

In [0]:
# Example of union
df1 = spark.createDataFrame([(1, "Alice")], ["id", "name"])
df2 = spark.createDataFrame([(2, "Bob")], ["id", "name"])
df_union = df1.union(df2)
df_union.show()

# Example of unionByName
df1 = spark.createDataFrame([(1, "Alice")], ["id", "name"])
df2 = spark.createDataFrame([(2, "Bob")], ["id", "name"])
df_union_by_name = df1.unionByName(df2)
df_union_by_name.show()

## 15. transform, apply, map, and flatmap
`transform` is used to apply a transformation function to a DataFrame. `apply` is used for column-wise operations. `map` and `flatMap` are used for element-wise operations.

In [0]:
# Example of transform
def add_one(df):
    return df.withColumn("new_column", col("existing_column") + 1)

df_transformed = df.transform(add_one)
df_transformed.show()

# Example of apply
df_applied = df["existing_column"].apply(lambda x: x + 1)
df_applied.show()

# Example of map
rdd = sc.parallelize([1, 2, 3, 4, 5])
mapped_rdd = rdd.map(lambda x: x * 2)
print(mapped_rdd.collect())  # Output: [2, 4, 6, 8, 10]

# Example of flatMap
rdd = sc.parallelize(["hello world", "foo bar"])
flatmapped_rdd = rdd.flatMap(lambda x: x.split(" "))
print(flatmapped_rdd.collect())  # Output: ['hello', 'world', 'foo', 'bar']

## 16. fill, fillna, and replace
`fill` and `fillna` are used to replace null values in a DataFrame. `replace` is used to replace specified values in a DataFrame.

In [0]:
# Example of fill and fillna
df_filled = df.fillna(0)
df_filled.show()

# Example of replace
df_replaced = df.replace("old_value", "new_value")
df_replaced.show()

## 17. pivot
`pivot` is used to rotate the DataFrame by turning the unique values from one column into multiple columns and performing aggregation on other columns.

In [0]:
# Example of pivot
df_pivot = df.groupBy("column1").pivot("column2").sum("column3")
df_pivot.show()

## 18. repartition
`repartition` is used to change the number of partitions in a DataFrame. This can be useful for optimizing performance.

In [0]:
# Example of repartition
df_repartitioned = df.repartition(4)
print(df_repartitioned.rdd.getNumPartitions())  # Output: 4