# PySpark

### Theory


PySpark is a Python library that provides an interface for Apache Spark, which is a distributed computing framework for processing large datasets. PySpark allows you to write Spark applications using Python, which is a popular language for data analysis and machine learning.
Steps to learn:
> Understand Spark concepts: It's important to have a good understanding of Spark concepts such as RDDs (Resilient Distributed Datasets), transformations, and actions. You can find information on these topics in the Apache Spark documentation.

>Start with basic PySpark operations: Once you have a good understanding of Spark concepts, you can start working with PySpark. Start with basic PySpark operations such as creating RDDs, performing transformations, and running actions.

>Practice with PySpark examples: There are many PySpark examples available online that you can use to practice and learn PySpark. You can find PySpark examples in the official Apache Spark documentation, on GitHub, and on other websites.

### Data types in pyspark
><b>RDD(Resilient Distributed Datasets)</b> - RDDs are the core abstraction in Spark, and are the most low-level data structure available. RDDs are an immutable distributed collection of objects that can be processed in parallel across a cluster. RDDs are resilient to failures and can be rebuilt on demand, which makes them ideal for processing large datasets. However, RDDs have some limitations, such as lack of optimizations and slower performance than DataFrames or Datasets.

><b>Dataframe</b>: DataFrames are a more high-level abstraction than RDDs, and provide a more efficient and optimized way to work with structured data. DataFrames are similar to tables in a relational database, where data is organized into rows and columns. DataFrames are immutable distributed collections of data, with named columns and optimized for querying and filtering operations. DataFrames are based on RDDs but with added schema information, which allows Spark to perform more efficient optimizations.

><b>Dataset</b>: Datasets are a newer abstraction in Spark and are a hybrid between RDDs and DataFrames. Datasets are distributed collections of data with strongly typed objects, and are optimized for type-safe processing. Datasets allow Spark to perform compile-time type checking, which can help catch errors early in the development cycle. Datasets provide the flexibility and scalability of RDDs, while also providing the performance benefits of DataFrames.

### Spark Concepts:
#### Spark
>Spark is a distributed computing framework designed for processing large datasets. It is based on the concept of Resilient Distributed Datasets (RDDs), which are fault-tolerant, immutable data structures that can be processed in parallel across multiple nodes in a cluster.

>Spark operates on a master-slave architecture, where there is a central coordinator (master) that distributes tasks to multiple workers (slaves) in a cluster. The master node is responsible for managing the cluster resources, while the worker nodes are responsible for executing the tasks assigned to them.

>Spark provides a high-level API in several programming languages, including Python (PySpark), Java, Scala, and R. It supports a wide range of data processing operations, such as filtering, aggregating, sorting, and joining, which can be performed on RDDs or on more optimized data structures like DataFrames and Datasets.

>One of the key features of Spark is its ability to perform in-memory processing, which allows it to achieve very high processing speeds compared to traditional disk-based processing systems. Spark also provides a number of optimization techniques, such as caching, partitioning, and pipelining, which further improve its performance.

>Spark is used in a wide range of applications, including data processing, machine learning, and stream processing. It is particularly well-suited for big data processing tasks, as it can scale to handle datasets that are too large to fit in a single machine's memory.

>Overall, Spark is a powerful distributed computing framework that allows for efficient processing of large datasets in a distributed and fault-tolerant manner.

#### Typed Object
>a strongly typed object refers to an object whose data type is known at compile-time. In other words, the data type of a strongly typed object is known at the time the code is written, rather than being inferred at runtime.

#### Type-safe
>Type-safe processing, on the other hand, refers to a programming approach where type errors are caught at compile-time rather than at runtime. This means that if a piece of code tries to perform an operation on an object of the wrong type, the code will not compile and an error will be raised, rather than allowing the code to run and potentially causing errors at runtime.

#### Spark Session
In PySpark, a SparkSession is the entry point to a Spark application and provides a way to interact with Spark functionality. It is the primary way to create and manipulate PySpark DataFrames and other distributed data structures.<br>
The SparkSession object is a combination of the previously used SparkContext, SQLContext, and HiveContext, which have been merged into a single object in PySpark 2.0 and later versions. The SparkSession provides a unified entry point for Spark programming, allowing you to easily configure and access Spark functionality through a single API.<br>
The SparkSession provides several methods for creating PySpark DataFrames, such as reading data from various sources like CSV, JSON, Parquet, and Hive tables. It also provides access to Spark's machine learning (ML) library and Spark Streaming API.<br>
Once you have created a SparkSession, you can use it to create PySpark DataFrames, access Spark's ML library, or perform other Spark operations.

#### UDF (User-Defined Function) : 
is a feature in PySpark that allows you to define your own custom functions to operate on DataFrame columns. You can use UDFs to apply any Python function to a PySpark DataFrame column.
To use a UDF in PySpark, you need to follow these steps:

>Define a Python function that takes one or more arguments and returns a value.

>Register the Python function as a UDF using the udf() function from the pyspark.sql.functions module. You need to specify the input and output types of the UDF.

>Use the registered UDF to apply the Python function to a PySpark DataFrame column using the withColumn() method.

## Prcticle
Pyspark dataframe from json file: To create a PySpark DataFrame from a JSON file, you can use the spark.read.json() method
<code>
        from pyspark.sql import SparkSession
        # create a SparkSession
        spark = SparkSession.builder.appName("JSON to DataFrame").getOrCreate()
        # read the JSON file into a DataFrame
        df = spark.read.json("path/to/json/file.json")
        # display the DataFrame
        df.show()
</code>

Pyspark RDD from json file: To create a PySpark RDD from a JSON file, you can use the sc.textFile() method to read the JSON file as a text file, and then use the json.loads() method to parse each line of the text file into a Python dictionary.
<code>    
    import json
    from pyspark import SparkContext
    # create a SparkContext
    sc = SparkContext("local", "JSON to RDD")
    # read the JSON file as a text file
    text_rdd = sc.textFile("path/to/json/file.json")
    # parse each line of the text file into a Python dictionary
    json_rdd = text_rdd.map(lambda line: json.loads(line))
    # display the RDD
    json_rdd.collect()
</code>

Pyspark Dataset from json file:
<code>
    from pyspark.sql import SparkSession
    # create a SparkSession
    spark = SparkSession.builder.appName("JSON to Dataset").getOrCreate()
    # read the JSON file into a DataFrame
    df = spark.read.json("path/to/json/file.json")
    # convert the DataFrame to a Dataset
    ds = df.as("mydataset")
    # display the Dataset
    ds.show()
</code>

### selecting data from pyspark dataframe

In [1]:
from pyspark.sql.functions import when

from pyspark.sql import SparkSession
# create a SparkSession
spark = SparkSession.builder.appName("JSON to DataFrame").getOrCreate()

df = spark.createDataFrame([('Rahul', 29), ("Aashish", 28), ("Aashu", 30), ('Sheetal',22)], ['name', 'age'])
print(df.show())

rows = df.select('name').take(3)  # Select the first three rows of the DataFrame
selected_rows = [rows[0], rows[2]]  # Select the first and third rows
print(selected_rows)

23/04/01 13:54:40 WARN Utils: Your hostname, confabsol7-Lenovo-ideapad-330-15IKB resolves to a loopback address: 127.0.1.1; using 192.168.1.4 instead (on interface wlp3s0)
23/04/01 13:54:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/01 13:54:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


                                                                                

+-------+---+
|   name|age|
+-------+---+
|  Rahul| 29|
|Aashish| 28|
|  Aashu| 30|
|Sheetal| 22|
+-------+---+

None
[Row(name='Rahul'), Row(name='Aashu')]


In [2]:
sliced_df = df.limit(2)
sliced_df.show()

+-------+---+
|   name|age|
+-------+---+
|  Rahul| 29|
|Aashish| 28|
+-------+---+



In [3]:
sliced_df = df.select(df.columns[1:4]) #selecting 2nd, 3rd and 4th column
sliced_df.show()

+---+
|age|
+---+
| 29|
| 28|
| 30|
| 22|
+---+



### Creating calculated columns

In [4]:
from pyspark.sql.functions import col

# create a DataFrame
df = spark.createDataFrame([(1, 2), (3, 4), (5, 6)], ['a', 'b'])

# add a calculated column 'c' by multiplying 'a' and 'b'
df = df.withColumn('c', col('a') * col('b'))

# show the resulting DataFrame
df.show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  2|
|  3|  4| 12|
|  5|  6| 30|
+---+---+---+



### Applying user defined function using UDF

In [5]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# create a DataFrame
df = spark.createDataFrame([(1, 2), (3, 4), (5, 6)], ['a', 'b'])

# define a Python function to apply to the column
def add_one(x):
    return x + 1

# create a UDF (User-Defined Function)
add_one_udf = udf(add_one, IntegerType())

# apply the UDF to the 'a' column
df = df.withColumn('a_plus_one', add_one_udf('a'))

# show the resulting DataFrame
df.show()

                                                                                

+---+---+----------+
|  a|  b|a_plus_one|
+---+---+----------+
|  1|  2|         2|
|  3|  4|         4|
|  5|  6|         6|
+---+---+----------+



In [6]:
# show columns in pyspark dataframe
print(df.columns)

# to show rows count in dataframe
print(df.count())

# show top 5 rows using head
print(df.head(5))

# show bottom 5 records
print(df.tail(5))

# select specific columns
print(df[[df.a]].head(5))

# select specific columns
print(df.select('a', 'b').head(5))

# select specific columns
print(df.select(['a', 'b']).head(5))

['a', 'b', 'a_plus_one']
3
[Row(a=1, b=2, a_plus_one=2), Row(a=3, b=4, a_plus_one=4), Row(a=5, b=6, a_plus_one=6)]
[Row(a=1, b=2, a_plus_one=2), Row(a=3, b=4, a_plus_one=4), Row(a=5, b=6, a_plus_one=6)]
[Row(a=1), Row(a=3), Row(a=5)]
[Row(a=1, b=2), Row(a=3, b=4), Row(a=5, b=6)]
[Row(a=1, b=2), Row(a=3, b=4), Row(a=5, b=6)]


### Applying and using filter and where

In [7]:
from pyspark.sql.functions import col
# using filter 
print(df.filter('a > 2').head(5))
print(df.filter(df.a > 2).head(5))
print(df.filter(col('a') >= 2).head(5))

# using where 
print(df.where('a > 2').head(5))
print(df.where(df.a > 2).head(5))
print(df.where(col('a') >= 2).head(5))

[Row(a=3, b=4, a_plus_one=4), Row(a=5, b=6, a_plus_one=6)]
[Row(a=3, b=4, a_plus_one=4), Row(a=5, b=6, a_plus_one=6)]
[Row(a=3, b=4, a_plus_one=4), Row(a=5, b=6, a_plus_one=6)]
[Row(a=3, b=4, a_plus_one=4), Row(a=5, b=6, a_plus_one=6)]
[Row(a=3, b=4, a_plus_one=4), Row(a=5, b=6, a_plus_one=6)]
[Row(a=3, b=4, a_plus_one=4), Row(a=5, b=6, a_plus_one=6)]


In [8]:
from pyspark.sql.functions import col
# using filter 
print(df.filter('a > 2 and b>4').head(5))
print(df.filter((df.a > 2) & (df.b > 4)).head(5))
print(df.filter((col('a') >= 2) & (col('b') > 4)).head(5))

[Row(a=5, b=6, a_plus_one=6)]
[Row(a=5, b=6, a_plus_one=6)]
[Row(a=5, b=6, a_plus_one=6)]


### Iterating through pyspark dataframe

In [9]:
def print_row(row):
    print(row)

df.foreach(print_row)

Row(a=1, b=2, a_plus_one=2)
Row(a=5, b=6, a_plus_one=6)
Row(a=3, b=4, a_plus_one=4)


In [10]:
rows = df.collect()
for row in rows:
    # Perform some operations on the row
    print(row)

Row(a=1, b=2, a_plus_one=2)
Row(a=3, b=4, a_plus_one=4)
Row(a=5, b=6, a_plus_one=6)


### Updating data in pyspark dataframe

In [11]:
from pyspark.sql.functions import when

df = spark.createDataFrame([('Rahul', 29), ("Aashish", 28), ("Aashu", 30), ('Sheetal',22)], ['name', 'age'])

df = df.withColumn("lessthan30", when((col("name") == "Rahul") & (col("age") < 30), 30-col("age")).otherwise(col("age")))
df.show()

+-------+---+----------+
|   name|age|lessthan30|
+-------+---+----------+
|  Rahul| 29|         1|
|Aashish| 28|        28|
|  Aashu| 30|        30|
|Sheetal| 22|        22|
+-------+---+----------+



In [12]:
df = df.selectExpr(
    "CASE WHEN name = 'Rahul' AND age < 30 THEN 30 ELSE age END AS lessthan30",
    "CASE WHEN name = 'Sheetal' THEN 'Bittu' ELSE name END AS name"
)
df.show()

+----------+-------+
|lessthan30|   name|
+----------+-------+
|        30|  Rahul|
|        28|Aashish|
|        30|  Aashu|
|        22|  Bittu|
+----------+-------+



### Aggregating Pyspark dataframe

In [13]:
from pyspark.sql import SparkSession
# create a SparkSession
spark = SparkSession.builder.appName("Pyspark aggregation").getOrCreate()

# Create a sample DataFrame
data = [("Alice", "F", 25), ("Bob", "M", 30), ("Charlie", "M", 35), ("Dave", "M", 40), ("Eve", "F", 45)]
df = spark.createDataFrame(data, ["name", "gender", "age"])

# Group the DataFrame by gender and compute the average age
agg_df = df.groupBy("gender").agg({"age": "avg"})

# Display the aggregated DataFrame
agg_df.show()

23/04/01 13:55:16 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
+------+--------+
|gender|avg(age)|
+------+--------+
|     F|    35.0|
|     M|    35.0|
+------+--------+



In [14]:
agg_df = df.groupBy("gender").agg({"age": "sum", "age": "avg"})

# Display the aggregated DataFrame
agg_df.show()

+------+--------+
|gender|avg(age)|
+------+--------+
|     F|    35.0|
|     M|    35.0|
+------+--------+

