In [47]:
#!pip install pyspark

Resilient Distributed Datasets (RDD) is a fundamental data structure in Apache Spark, which is a fast and distributed processing engine for big data analytics. RDDs provide a fault-tolerant and distributed way of processing and manipulating data in parallel across a cluster of machines.

Here's an overview of RDDs in PySpark:

**Creation of RDDs:** 
  RDDs can be created in PySpark from various data sources such as Hadoop Distributed File System (HDFS), local file systems, or by transforming existing RDDs or other data structures. RDDs can be created using the parallelize() method to distribute a collection or by loading data from external sources.

**Immutable and Partitioned:** 
  RDDs are immutable, meaning they cannot be modified once created. Instead, transformations are applied to RDDs to create new RDDs. RDDs are also partitioned, where data is divided into smaller partitions that can be processed in parallel across a cluster.

**Resilience and Fault-Tolerance:** 
  RDDs are designed to be resilient, meaning they can recover from failures. RDDs achieve fault-tolerance by keeping track of the lineage, which is the history of transformations applied to the base data. If a partition is lost due to a failure, Spark can recompute the lost partition using the lineage.

**Transformation and Action Operations:** 
  RDDs support two types of operations: transformations and actions. Transformations are operations that produce new RDDs from existing RDDs, such as map(), filter(), reduceByKey(), etc. Actions are operations that compute a result or return a value, such as count(), collect(), reduce(), etc.

**Lazy Evaluation:** 
  RDDs follow lazy evaluation, which means that transformations on RDDs are not immediately executed. Instead, Spark optimizes the execution plan and waits until an action operation is called to trigger the actual computation. This optimization allows for efficient execution and minimizes unnecessary computation.

**Caching:** 
  RDDs can be cached in memory to improve performance. By caching an RDD, intermediate results can be stored in memory, allowing for faster access and reuse of data across multiple computations. Caching is particularly useful when an RDD is accessed multiple times or when iterative algorithms are applied.

  RDDs form the core data structure in PySpark, enabling distributed data processing and computation. They provide a high-level API for working with big data and allow developers to write parallel and scalable data processing code. However, it's worth noting that newer versions of Spark (2.x and above) provide higher-level abstractions like DataFrames and Datasets, which offer optimizations and a more expressive API compared to RDDs.

In PySpark, a schema defines the structure and data types of a DataFrame or a structured RDD. It provides a way to organize and describe the data contained within a DataFrame, specifying the column names and their corresponding data types. The schema helps define the structure of the data and allows PySpark to optimize query execution and perform various data operations efficiently.

Here's an example of how to define a schema in PySpark using the pyspark.sql.types module:

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Define the schema
schema = StructType([
    StructField("name", StringType(), nullable=False),
    StructField("age", IntegerType(), nullable=True),
    StructField("city", StringType(), nullable=True)
])

# Create a DataFrame with the defined schema
data = [("John", 25, "New York"), ("Alice", 30, "London"), ("Bob", 35, "Paris")]
df = spark.createDataFrame(data, schema)

# Display the DataFrame
df.show()


+-----+---+--------+
| name|age|    city|
+-----+---+--------+
| John| 25|New York|
|Alice| 30|  London|
|  Bob| 35|   Paris|
+-----+---+--------+



In the example above, we define a schema using the StructType class and specify the column names and data types using StructField. In this case, we have three columns: "name" (StringType), "age" (IntegerType), and "city" (StringType). We set the nullable parameter to False for the "name" column to indicate that it cannot contain null values.

We then create a DataFrame df by providing the data and the defined schema to the createDataFrame() method. Finally, we display the DataFrame using the show() method.

## Lambda Expressions pyspark

Lambda expressions are anonymous functions that are commonly used in programming languages to create small, inline functions. In PySpark, which is the Python library for Apache Spark, lambda expressions can be utilized to define functions on the fly.

Lambda expressions in PySpark are typically used in conjunction with higher-order functions such as *map(), filter(), and reduce()* to perform operations on distributed collections of data. These higher-order functions take other functions as arguments, and lambda expressions provide a convenient way to define these functions without explicitly naming them.

Here's an example of using a lambda expression with the map() function in PySpark to square each element in an RDD (Resilient Distributed Dataset):

In [6]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create an RDD with some numbers
numbers_rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

# Use map() with a lambda expression to square each number
squared_rdd = numbers_rdd.map(lambda x: x ** 2)

# Collect the results
result = squared_rdd.collect()
print(result)


[1, 4, 9, 16, 25]


### map() function:
  The map() function applies a given function to each element of an RDD and returns a new RDD with the transformed elements. Lambda expressions can be used to define the function to be applied. For example:

In [42]:
numbers_rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
squared_rdd = numbers_rdd.map(lambda x: x ** 2)
squared_rdd

PythonRDD[112] at RDD at PythonRDD.scala:53

### filter() function:
  The filter() function creates a new RDD by selecting the elements that satisfy a given condition. Lambda expressions can be used to define the condition. For example:

In [43]:
numbers_rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
even_rdd = numbers_rdd.filter(lambda x: x % 2 == 0)


### reduce() function:
The reduce() function aggregates the elements of an RDD using a specified binary operator. Lambda expressions can be used to define the binary operator. For example, to find the sum of all elements in an RDD:

In [44]:
numbers_rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
sum_of_elements = numbers_rdd.reduce(lambda x, y: x + y)


### Sorting with sortBy() function:
The sortBy() function sorts the elements of an RDD based on a specified key. Lambda expressions can be used to define the key. For example, to sort an RDD of tuples based on the second element:

In [45]:
data_rdd = spark.sparkContext.parallelize([(1, 'a'), (2, 'c'), (3, 'b')])
sorted_rdd = data_rdd.sortBy(lambda x: x[1])


### Custom transformations:
Lambda expressions can be used to define custom transformations on RDDs. For example, to transform an RDD by adding a constant value to each element:

In [46]:
numbers_rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
transformed_rdd = numbers_rdd.map(lambda x: x + 10)


## Transformations pyspark

In PySpark, transformations are operations that are applied to a DataFrame or an RDD (Resilient Distributed Dataset) to create a new DataFrame or RDD. Transformations are lazy operations, meaning they are not executed immediately but create a directed acyclic graph (DAG) representing the computation. The actual execution is triggered when an action is called on the DataFrame or RDD.

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Define the schema
schema = StructType([
    StructField("name", StringType(), nullable=False),
    StructField("age", IntegerType(), nullable=True),
    StructField("city", StringType(), nullable=True)
])

# Create a DataFrame with the defined schema
data = [("John", 25, "New York"), ("Alice", 30, "London"), ("Bob", 35, "Paris")]
df = spark.createDataFrame(data, schema)
df.show()

+-----+---+--------+
| name|age|    city|
+-----+---+--------+
| John| 25|New York|
|Alice| 30|  London|
|  Bob| 35|   Paris|
+-----+---+--------+



In [10]:
df.select("name", "age").show()


+-----+---+
| name|age|
+-----+---+
| John| 25|
|Alice| 30|
|  Bob| 35|
+-----+---+



In [11]:
df.filter(df.age > 30).show()


+----+---+-----+
|name|age| city|
+----+---+-----+
| Bob| 35|Paris|
+----+---+-----+



In [14]:
df.withColumn("new_column", df.name + df.city).show()


+-----+---+--------+----------+
| name|age|    city|new_column|
+-----+---+--------+----------+
| John| 25|New York|      null|
|Alice| 30|  London|      null|
|  Bob| 35|   Paris|      null|
+-----+---+--------+----------+



In [15]:
df.groupBy("name").sum("age").show()


+-----+--------+
| name|sum(age)|
+-----+--------+
| John|      25|
|  Bob|      35|
|Alice|      30|
+-----+--------+



In [17]:
df.orderBy("age", ascending=False).show()

+-----+---+--------+
| name|age|    city|
+-----+---+--------+
|  Bob| 35|   Paris|
|Alice| 30|  London|
| John| 25|New York|
+-----+---+--------+



In [19]:
df.distinct().show()

+-----+---+--------+
| name|age|    city|
+-----+---+--------+
| John| 25|New York|
|  Bob| 35|   Paris|
|Alice| 30|  London|
+-----+---+--------+



In [20]:
# Define the schema
schema = StructType([
    StructField("name", StringType(), nullable=False),
    StructField("age", IntegerType(), nullable=True),
    StructField("city", StringType(), nullable=True)
])

# Create a DataFrame with the defined schema
data1 = [("k", 25, "New York"), ("A", 30, "London"), ("B", 35, "Paris")]
df1 = spark.createDataFrame(data1, schema)
df1.show()

+----+---+--------+
|name|age|    city|
+----+---+--------+
|   k| 25|New York|
|   A| 30|  London|
|   B| 35|   Paris|
+----+---+--------+



In [23]:
df1.join(df, df1.city == df.city, "inner").show()


+----+---+--------+-----+---+--------+
|name|age|    city| name|age|    city|
+----+---+--------+-----+---+--------+
|   A| 30|  London|Alice| 30|  London|
|   k| 25|New York| John| 25|New York|
|   B| 35|   Paris|  Bob| 35|   Paris|
+----+---+--------+-----+---+--------+



In [25]:
df1.union(df).show()

+-----+---+--------+
| name|age|    city|
+-----+---+--------+
|    k| 25|New York|
|    A| 30|  London|
|    B| 35|   Paris|
| John| 25|New York|
|Alice| 30|  London|
|  Bob| 35|   Paris|
+-----+---+--------+



In [26]:
df.drop("name").show()

+---+--------+
|age|    city|
+---+--------+
| 25|New York|
| 30|  London|
| 35|   Paris|
+---+--------+



In [30]:
df.rdd.map(lambda x: x * 2)


PythonRDD[94] at RDD at PythonRDD.scala:53

## Actions pyspark

In PySpark, actions are operations that trigger the execution of transformations and return results or perform some computation on a DataFrame or an RDD (Resilient Distributed Dataset). Unlike transformations, actions are eagerly executed and trigger the actual computation of the data.

In [31]:
# show(): Displays the first few rows of a DataFrame
df.show()

+-----+---+--------+
| name|age|    city|
+-----+---+--------+
| John| 25|New York|
|Alice| 30|  London|
|  Bob| 35|   Paris|
+-----+---+--------+



In [32]:
# count(): Returns the number of rows in a DataFrame.
df.count()


3

In [33]:
# collect(): Returns all the rows of a DataFrame as a list in the driver program.

df.collect()


[Row(name='John', age=25, city='New York'),
 Row(name='Alice', age=30, city='London'),
 Row(name='Bob', age=35, city='Paris')]

In [34]:
df.first() #first(): Returns the first row of a DataFrame.


Row(name='John', age=25, city='New York')

In [35]:
#take(n): Returns the first n rows of a DataFrame as a list.
df.take(2)


[Row(name='John', age=25, city='New York'),
 Row(name='Alice', age=30, city='London')]

In [38]:
#reduce(): Applies a binary function to the elements of an RDD and returns the result.
rdd.reduce(lambda x, y: x + y)


In [None]:
#foreach(): Applies a function to each element of an RDD
rdd.foreach(lambda x: print(x))


In [None]:
#saveAsTextFile(): Saves the contents of an RDD to a text file.
rdd.saveAsTextFile("output.txt")


In [39]:
pandas_df = df.toPandas()


In [40]:
pandas_df

Unnamed: 0,name,age,city
0,John,25,New York
1,Alice,30,London
2,Bob,35,Paris
