In [14]:
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#!pip install pyspark

# install if needed

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import random
from pyspark.sql import functions as F

# Create Spark session
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Test with a sample DataFrame
# Define schema
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("City", StringType(), True)
])

# Create some sample data
names = ["Alice", "Bob", "Cathy", "David", "Eva", "Frank", "Grace", "Henry", "Ivy", "Jack"]
cities = ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix", "Philadelphia", "San Antonio", "San Diego", "Dallas", "San Jose"]

# Generate 50 rows of random data
data = [(random.choice(names), random.randint(20, 60), random.choice(cities)) for _ in range(50)]

# Create the DataFrame
df = spark.createDataFrame(data, schema)

df.show()


+-----+---+------------+
| Name|Age|        City|
+-----+---+------------+
|Alice| 26|   San Diego|
|  Ivy| 25|     Houston|
|Grace| 45|     Chicago|
|  Ivy| 20| San Antonio|
|Henry| 22|   San Diego|
| Jack| 44|    San Jose|
|Cathy| 22|      Dallas|
|David| 53|    New York|
|  Bob| 53|    San Jose|
|Henry| 41|      Dallas|
|  Bob| 46| San Antonio|
|  Eva| 28|    San Jose|
|Frank| 51| San Antonio|
|Grace| 25|      Dallas|
|David| 28|Philadelphia|
|Henry| 22|      Dallas|
|Henry| 55|   San Diego|
|David| 33|   San Diego|
|Henry| 31|      Dallas|
|  Bob| 23|     Phoenix|
+-----+---+------------+
only showing top 20 rows



Spark performs parallel processing on data. It uses concept of HDFC and YARN which are same as in Hadoop

In [16]:
df.show(5)

+-----+---+-----------+
| Name|Age|       City|
+-----+---+-----------+
|Alice| 26|  San Diego|
|  Ivy| 25|    Houston|
|Grace| 45|    Chicago|
|  Ivy| 20|San Antonio|
|Henry| 22|  San Diego|
+-----+---+-----------+
only showing top 5 rows



In [17]:
df = df.withColumnRenamed("_3","City")
df.show()

+-----+---+------------+
| Name|Age|        City|
+-----+---+------------+
|Alice| 26|   San Diego|
|  Ivy| 25|     Houston|
|Grace| 45|     Chicago|
|  Ivy| 20| San Antonio|
|Henry| 22|   San Diego|
| Jack| 44|    San Jose|
|Cathy| 22|      Dallas|
|David| 53|    New York|
|  Bob| 53|    San Jose|
|Henry| 41|      Dallas|
|  Bob| 46| San Antonio|
|  Eva| 28|    San Jose|
|Frank| 51| San Antonio|
|Grace| 25|      Dallas|
|David| 28|Philadelphia|
|Henry| 22|      Dallas|
|Henry| 55|   San Diego|
|David| 33|   San Diego|
|Henry| 31|      Dallas|
|  Bob| 23|     Phoenix|
+-----+---+------------+
only showing top 20 rows



In [18]:
df = df.filter(df.Age > 30)
df.show()

+-----+---+-----------+
| Name|Age|       City|
+-----+---+-----------+
|Grace| 45|    Chicago|
| Jack| 44|   San Jose|
|David| 53|   New York|
|  Bob| 53|   San Jose|
|Henry| 41|     Dallas|
|  Bob| 46|San Antonio|
|Frank| 51|San Antonio|
|Henry| 55|  San Diego|
|David| 33|  San Diego|
|Henry| 31|     Dallas|
|Grace| 53|San Antonio|
|Grace| 36|   New York|
|David| 38|    Phoenix|
| Jack| 31|    Chicago|
|Frank| 39|   San Jose|
|Alice| 39|   New York|
|Cathy| 43|    Phoenix|
| Jack| 60|   San Jose|
|Frank| 42|   San Jose|
|Alice| 41|   New York|
+-----+---+-----------+
only showing top 20 rows



In [19]:
df = df.unionByName(df)
df.show()

+-----+---+-----------+
| Name|Age|       City|
+-----+---+-----------+
|Grace| 45|    Chicago|
| Jack| 44|   San Jose|
|David| 53|   New York|
|  Bob| 53|   San Jose|
|Henry| 41|     Dallas|
|  Bob| 46|San Antonio|
|Frank| 51|San Antonio|
|Henry| 55|  San Diego|
|David| 33|  San Diego|
|Henry| 31|     Dallas|
|Grace| 53|San Antonio|
|Grace| 36|   New York|
|David| 38|    Phoenix|
| Jack| 31|    Chicago|
|Frank| 39|   San Jose|
|Alice| 39|   New York|
|Cathy| 43|    Phoenix|
| Jack| 60|   San Jose|
|Frank| 42|   San Jose|
|Alice| 41|   New York|
+-----+---+-----------+
only showing top 20 rows



In [20]:
df1 = df.groupby("Name").count()
df1.show()

+-----+-----+
| Name|count|
+-----+-----+
|Grace|   10|
| Jack|   10|
|  Bob|    4|
|David|    8|
|Frank|    6|
|Henry|    8|
|  Ivy|    4|
|Alice|    6|
|Cathy|    6|
+-----+-----+



In [21]:
df1 = df.filter(df["Name"].endswith("e"))
df1.show()

+-----+---+-----------+
| Name|Age|       City|
+-----+---+-----------+
|Grace| 45|    Chicago|
|Grace| 53|San Antonio|
|Grace| 36|   New York|
|Alice| 39|   New York|
|Alice| 41|   New York|
|Alice| 48|    Chicago|
|Grace| 32|Los Angeles|
|Grace| 55|San Antonio|
|Grace| 45|    Chicago|
|Grace| 53|San Antonio|
|Grace| 36|   New York|
|Alice| 39|   New York|
|Alice| 41|   New York|
|Alice| 48|    Chicago|
|Grace| 32|Los Angeles|
|Grace| 55|San Antonio|
+-----+---+-----------+



In [22]:
df.collect()

[Row(Name='Grace', Age=45, City='Chicago'),
 Row(Name='Jack', Age=44, City='San Jose'),
 Row(Name='David', Age=53, City='New York'),
 Row(Name='Bob', Age=53, City='San Jose'),
 Row(Name='Henry', Age=41, City='Dallas'),
 Row(Name='Bob', Age=46, City='San Antonio'),
 Row(Name='Frank', Age=51, City='San Antonio'),
 Row(Name='Henry', Age=55, City='San Diego'),
 Row(Name='David', Age=33, City='San Diego'),
 Row(Name='Henry', Age=31, City='Dallas'),
 Row(Name='Grace', Age=53, City='San Antonio'),
 Row(Name='Grace', Age=36, City='New York'),
 Row(Name='David', Age=38, City='Phoenix'),
 Row(Name='Jack', Age=31, City='Chicago'),
 Row(Name='Frank', Age=39, City='San Jose'),
 Row(Name='Alice', Age=39, City='New York'),
 Row(Name='Cathy', Age=43, City='Phoenix'),
 Row(Name='Jack', Age=60, City='San Jose'),
 Row(Name='Frank', Age=42, City='San Jose'),
 Row(Name='Alice', Age=41, City='New York'),
 Row(Name='Alice', Age=48, City='Chicago'),
 Row(Name='Henry', Age=55, City='New York'),
 Row(Name='Davi

**Sparks performs parallel processing with help of HDFS and YARN.
They are the same methods of Hadoop. The advantage of Spark over Hadoop is real time processing whereas Hadoop performs batch processing**

They are two types of variables for helping spark in parallel processing:
1. Broadcast  2. Accumulator

In [24]:
df.show(5)

+-----+---+--------+
| Name|Age|    City|
+-----+---+--------+
|Grace| 45| Chicago|
| Jack| 44|San Jose|
|David| 53|New York|
|  Bob| 53|San Jose|
|Henry| 41|  Dallas|
+-----+---+--------+
only showing top 5 rows



In [25]:
df.describe() # to know data types of columns

DataFrame[summary: string, Name: string, Age: string, City: string]

In [30]:
(df.filter(df["Name"].isNull())).show() # to know null value in  df

+----+---+----+
|Name|Age|City|
+----+---+----+
+----+---+----+



In [38]:
print(df.filter(df["Name"].startswith("A")).show())
print(df.filter(df["Name"].endswith("y")).show())
print(df.filter(df["Name"].contains("b")).show())



+-----+---+--------+
| Name|Age|    City|
+-----+---+--------+
|Alice| 39|New York|
|Alice| 41|New York|
|Alice| 48| Chicago|
|Alice| 39|New York|
|Alice| 41|New York|
|Alice| 48| Chicago|
+-----+---+--------+

None
+-----+---+---------+
| Name|Age|     City|
+-----+---+---------+
|Henry| 41|   Dallas|
|Henry| 55|San Diego|
|Henry| 31|   Dallas|
|Cathy| 43|  Phoenix|
|Henry| 55| New York|
|Cathy| 45|  Houston|
|  Ivy| 60| New York|
|  Ivy| 46|   Dallas|
|Cathy| 37|   Dallas|
|Henry| 41|   Dallas|
|Henry| 55|San Diego|
|Henry| 31|   Dallas|
|Cathy| 43|  Phoenix|
|Henry| 55| New York|
|Cathy| 45|  Houston|
|  Ivy| 60| New York|
|  Ivy| 46|   Dallas|
|Cathy| 37|   Dallas|
+-----+---+---------+

None
+----+---+-----------+
|Name|Age|       City|
+----+---+-----------+
| Bob| 53|   San Jose|
| Bob| 46|San Antonio|
| Bob| 53|   San Jose|
| Bob| 46|San Antonio|
+----+---+-----------+

None
+-----+---+-----------+
| Name|Age|       City|
+-----+---+-----------+
|Grace| 45|    Chicago|
| Jack| 

In [52]:
df.filter(df["Name"]>="Ab").show()


+-----+---+-----------+
| Name|Age|       City|
+-----+---+-----------+
|Grace| 45|    Chicago|
| Jack| 44|   San Jose|
|David| 53|   New York|
|  Bob| 53|   San Jose|
|Henry| 41|     Dallas|
|  Bob| 46|San Antonio|
|Frank| 51|San Antonio|
|Henry| 55|  San Diego|
|David| 33|  San Diego|
|Henry| 31|     Dallas|
|Grace| 53|San Antonio|
|Grace| 36|   New York|
|David| 38|    Phoenix|
| Jack| 31|    Chicago|
|Frank| 39|   San Jose|
|Alice| 39|   New York|
|Cathy| 43|    Phoenix|
| Jack| 60|   San Jose|
|Frank| 42|   San Jose|
|Alice| 41|   New York|
+-----+---+-----------+
only showing top 20 rows



In [54]:
df.select(["Name","Age","City"]).where(df["Age"]>30).show(5)

+-----+---+--------+
| Name|Age|    City|
+-----+---+--------+
|Grace| 45| Chicago|
| Jack| 44|San Jose|
|David| 53|New York|
|  Bob| 53|San Jose|
|Henry| 41|  Dallas|
+-----+---+--------+
only showing top 5 rows



In [60]:
df.select(["City"]).where(df["Age"]>30).groupby(df["City"]).count().show()


+-----------+-----+
|       City|count|
+-----------+-----+
|    Phoenix|    4|
|     Dallas|    8|
|San Antonio|    8|
|  San Diego|    4|
|    Chicago|    6|
|   San Jose|   14|
|   New York|   12|
|Los Angeles|    2|
|    Houston|    4|
+-----------+-----+



In [69]:
df.select(["City"]).where((df["Age"]>20) & (df["Age"]<=40)).distinct().orderBy(df["City"]).show()


+-----------+
|       City|
+-----------+
|    Chicago|
|     Dallas|
|Los Angeles|
|   New York|
|    Phoenix|
|  San Diego|
|   San Jose|
+-----------+



In [71]:
df.select(["City"]).where(df["Age"]>30).distinct().orderBy(F.length(df["City"])).show()


+-----------+
|       City|
+-----------+
|     Dallas|
|    Phoenix|
|    Chicago|
|    Houston|
|   San Jose|
|   New York|
|  San Diego|
|San Antonio|
|Los Angeles|
+-----------+



In [73]:
df.select(["City"]).where(df["Age"]>30).distinct().orderBy(F.substring(df["City"],1,4)).show() # sorting on basis of first 4 letters of City column


+-----------+
|       City|
+-----------+
|    Chicago|
|     Dallas|
|    Houston|
|Los Angeles|
|   New York|
|    Phoenix|
|San Antonio|
|  San Diego|
|   San Jose|
+-----------+

