In [None]:
!pip install pyspark py4j

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h

What is SparkSession?

SparkSession was introduced in version Spark 2.0, it is an entry point to underlying Spark functionality in order to programmatically create Spark RDD, DataFrame, and DataSet. SparkSession’s object spark is the default variable available in spark-shell and it can be created programmatically using SparkSession builder pattern.

sparkContext:

*pyspark.SparkContext is an entry point to the PySpark functionality that is used to communicate with the cluster and to create an RDD, accumulator, and broadcast variable*


In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').master("local[5]").getOrCreate()
sparkContext=spark.sparkContext
rdd=sparkContext.parallelize([1,2,3,4,5])
rddCollect = rdd.collect()
print("Number of Partitions: "+str(rdd.getNumPartitions()))
print("Action: First element: "+str(rdd.first()))
print(rddCollect)
#sparkContext.stop()
'''you can create only one SparkContext per JVM, in order to create another first you need to stop the existing one using stop() method'''

Number of Partitions: 5
Action: First element: 1
[1, 2, 3, 4, 5]


RDD Introduction

RDD (Resilient Distributed Dataset) is a core building block of PySpark. It is a fault-tolerant, immutable, distributed collection of objects. Immutable means that once you create an RDD, you cannot change it. The data within RDDs is segmented into logical partitions, allowing for distributed computation across multiple nodes within the cluster.

* it is a collection of rows without schema
* it is immutable
* it is lazy execuation at row level and eager execution at schema level


**Spark DataFrame**

In Spark Scala, a DataFrame is a distributed collection of data organized into named columns similar to an SQL table.

* It is similar to a table in a relational database or a spreadsheet in that it has a schema, which defines the types and names of its columns, and each row represents a single record or observation.
* DataFrames in Spark Scala can be created from a variety of sources, such as RDDs, structured data files (e.g., CSV, JSON, Parquet), Hive tables, or external databases
* Once created, DataFrames support a wide range of operations and transformations, such as filtering, aggregating, joining, and grouping data.
* One of the key benefits of using DataFrames in Spark Scala is their ability to leverage Spark’s distributed computing capabilities to process large amounts of data quickly and efficiently.


RDD Creation

Using sparkContext.parallelize()



In [None]:
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.master("local[1]").getOrCreate()
sc=spark.sparkContext
data=[1,2,3,4,5,6,7,8,9,10,11,12]
rdd=sc.parallelize(data)
print(rdd.collect())


[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]


Using File Location:



In [None]:
rdd = spark.sparkContext.textFile("sample_data/mnist_test.csv")
print(rdd.count())

10000


In [None]:
rdd1=spark.read.csv(path="sample_data/mnist_test.csv",header=True,inferSchema=True)
rdd1.show(1)
rdd2=spark.read.json(path="sample_data/anscombe.json")
rdd2.show()
# Example of using option()
#default is patquet
rdd2.write.format("csv")  \
        .option("header", "true")  \
        .option("delimiter", "|")  \
        .save("sample_data/output")

+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|  NULL|NULL| NULL|              [|
|     I|10.0| 8.04|           NULL|
|     I| 8.0| 6.95|           NULL|
|     I|13.0| 7.58|           NULL|
|     I| 9.0| 8.81|           NULL|
|     I|11.0| 8.33|           NULL|
|     I|14.0| 9.96|           NULL|
|     I| 6.0| 7.24|           NULL|
|     I| 4.0| 4.26|           NULL|
|     I|12.0|10.84|           NULL|
|     I| 7.0| 4.81|           NULL|
|     I| 5.0| 5.68|           NULL|
|    II|10.0| 9.14|           NULL|
|    II| 8.0| 8.14|           NULL|
|    II|13.0| 8.74|           NULL|
|    II| 9.0| 8.77|           NULL|
|    II|11.0| 9.26|           NULL|
|    II|14.0|  8.1|           NULL|
|    II| 6.0| 6.13|           NULL|
|    II| 4.0|  3.1|           NULL|
+------+----+-----+---------------+
only showing top 20 rows



 PySpark RDD Repartition() vs Coalesce():

In PySpark, the choice between repartition() and coalesce() functions carries importance in optimizing performance and resource utilization. These methods play pivotal roles in reshuffling data across partitions within a DataFrame, yet they differ in their mechanisms and implications.


In [None]:
# Create spark session with local[5]
rdd = spark.sparkContext.parallelize(range(0,20))
print("From local[5] : "+str(rdd.getNumPartitions()))

# Use parallelize with 6 partitions
rdd1 = spark.sparkContext.parallelize(range(0,25), 6)
print("parallelize : "+str(rdd1.getNumPartitions()))



From local[5] : 5
parallelize : 6


RDD repartition():

repartition() is a transformation method available on RDDs (Resilient Distributed Datasets) that redistributes data across a specified number of partitions. When you call repartition(n), where n is the desired number of partitions, Spark reshuffles the data in the RDD into exactly n partitions.



* it increase or decrese the no of partition with redistribution of all data acress the specified no of partition.
* Full shuffling is done in repartition()
* Can be expensive for large dataset.
* Distribute data acress partition evenly(means same size of each partition)

In [None]:
rdd2 = rdd1.repartition(4)
print("Repartition size : "+str(rdd2.getNumPartitions()))
rdd2.saveAsTextFile("re-partition.txt")

'''
Partition 1 : 1 6 10 15 19
Partition 2 : 2 3 7 11 16
Partition 3 : 4 8 12 13 17
Partition 4 : 0 5 9 14 18
'''

Repartition size : 4


RDD coalesce():


In PySpark, coalesce() is a transformation method available on RDDs (Resilient Distributed Datasets) that reduces the number of partitions without shuffling data across the cluster. When you call coalesce(n), where n is the desired number of partitions, Spark merges existing partitions to create n partitions.

* It decrese the no of partition without shuffling of all data and by merging existing partition.
* Shuffling is not done in Coalesce().
* Less expensive then repartition().
* Distribute data acress partition imbalance size.

In [None]:
rdd3 = rdd1.coalesce(4)
print("Repartition size : "+str(rdd3.getNumPartitions()))
#rdd3.saveAsTextFile("/tmp/coalesce")

'''Partition 1 : 0 1 2
Partition 2 : 3 4 5 6 7 8 9
Partition 4 : 10 11 12
Partition 5 : 13 14 15 16 17 18 19'''

Broadcast Variables:

Broadcast variables are read-only shared variables that are cached and available on all nodes in a cluster in-order to access or use by the tasks. Instead of sending this data along with every task, PySpark distributes broadcast variables to the workers using efficient broadcast algorithms to reduce communication costs.


How does PySpark Broadcast work?

When you run a PySpark RDD, DataFrame applications that have the Broadcast variables defined and used, PySpark does the following.

*PySpark breaks the job into stages that have distributed shuffling and actions are executed with in the stage.

*Later Stages are also broken into tasks

*Spark broadcasts the common data (reusable) needed by tasks within each stage.

*The broadcasted data is cache in serialized format and deserialized before executing each task.

In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

rdd = spark.sparkContext.parallelize(data)

def state_convert(code):
    return broadcastStates.value[code]

result = rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).collect()
print(result)

[('James', 'Smith', 'USA', 'California'), ('Michael', 'Rose', 'USA', 'New York'), ('Robert', 'Williams', 'USA', 'California'), ('Maria', 'Jones', 'USA', 'Florida')]


Accumulator:

The PySpark Accumulator is a shared variable that is used with RDD and DataFrame to perform sum and counter operations similar to Map-reduce counters. These variables are shared by all executors to update and add information through aggregation or computative operations.


What is PySpark Accumulator?

Accumulators are write-only and initialize once variables where only tasks that are running on workers are allowed to update and updates from the workers get propagated automatically to the driver program. But, only the driver program is allowed to access the Accumulator variable using the value property

In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("accumulator").getOrCreate()

accum=spark.sparkContext.accumulator(0)
rdd=spark.sparkContext.parallelize([1,2,3,4,5])
rdd.foreach(lambda x:accum.add(x))
print(accum.value) #Accessed by driver

15


Create Empty RDD in PySpark:

Create an empty RDD by using emptyRDD() of SparkContext for example spark.sparkContext.emptyRDD().

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

#Creates Empty RDD
emptyRDD = spark.sparkContext.emptyRDD()
print(emptyRDD)

#Diplays
#EmptyRDD[188] at emptyRDD

EmptyRDD[32] at emptyRDD at NativeMethodAccessorImpl.java:0


Create Empty DataFrame with Schema (StructType):



In [None]:
from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
  StructField('firstname', StringType(), True),
  StructField('middlename', StringType(), True),
  StructField('lastname', StringType(), True)
  ])

#Create empty DataFrame from empty RDD
df = spark.createDataFrame(emptyRDD,schema)
df.printSchema()
#convert RDD to DF
df1 = emptyRDD.toDF(schema)
df1.printSchema()

#Create empty DataFrame directly.
df2 = spark.createDataFrame([], schema)
df2.printSchema()


#Create empty DatFrame with no schema (no columns)
df3 = spark.createDataFrame([], StructType([]))
df3.printSchema()

#print below empty schema
#root

Convert RDD to DF:

1.Using rdd.toDF() function
PySpark provides toDF() function in RDD which can be used to convert RDD into Dataframe

2.Using PySpark createDataFrame() function

SparkSession class provides createDataFrame() method to create DataFrame and it takes rdd object as an argument.

In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]
rdd = spark.sparkContext.parallelize(dept)
#using toDF function
df = rdd.toDF()
df.printSchema()
df.show(truncate=False)
# with selected columns
deptColumns = ["dept_name","dept_id"]
df2 = rdd.toDF(deptColumns)
df2.printSchema()
df2.show(truncate=False)
#using createDataFrame function
'''deptSchema = StructType([
    StructField('dept_name', StringType(), True),
    StructField('dept_id', StringType(), True)
])'''
df3 = spark.createDataFrame(rdd,["dept_name","dept_id"]) #or (rdd,schema=deptSchema)
df3.printSchema()
df3.show(truncate=False)

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

+---------+---+
|_1       |_2 |
+---------+---+
|Finance  |10 |
|Marketing|20 |
|Sales    |30 |
|IT       |40 |
+---------+---+

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



Convert PySpark Dataframe to Pandas DataFrame

PySpark DataFrame provides a method toPandas() to convert it to Python Pandas DataFrame.

toPandas() results in the collection of all records in the PySpark DataFrame to the driver program and should be done only on a small subset of the data. running on larger dataset’s results in memory error and crashes the application. To deal with a larger dataset, you can also try increasing memory on the driver.

StructType – Defines the structure of the DataFrame

StructField – Defines the metadata of the DataFrame column

It represents a field in the schema, containing metadata such as the name, data type, and nullable status of the field. Each StructField object defines a single column in the DataFrame, specifying its name and the type of data it holds.
You can Check Below Example


In [None]:
# Nested structure elements
import pyspark
from pyspark.sql.types import StructType, StructField, StringType,IntegerType
dataStruct = [(("James","","Smith"),"36636","M","3000"), \
      (("Michael","Rose",""),"40288","M","4000"), \
      (("Robert","","Williams"),"42114","M","4000"), \
      (("Maria","Anne","Jones"),"39192","F","4000"), \
      (("Jen","Mary","Brown"),"","F","-1") \
]

schemaStruct = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),\
             StructField('middlename', StringType(), True),\
             StructField('lastname', StringType(), True)\
             ])),\
          StructField('dob', StringType(), True),\
         StructField('gender', StringType(), True),\
         StructField('salary', StringType(), True)\
         ])
df = spark.createDataFrame(data=dataStruct, schema = schemaStruct)
df.printSchema()
#using toPandas()
pandasDF2 = df.toPandas()
print(pandasDF2)

# Default - displays 20 rows and
# 20 charactes from column value
df.show()

#Display full column contents
df.show(truncate=False)

# Display 2 rows and full column contents
df.show(2,truncate=False)

# Display 2 rows & column values 25 characters
df.show(2,truncate=25)

# Display DataFrame rows & columns vertically
df.show(n=3,truncate=25,vertical=True)

In [None]:
#Create DataFrame with struct using Row class
from pyspark.sql import Row
data=[Row(name="James",prop=Row(hair="black",eye="blue")),
      Row(name="Ann",prop=Row(hair="grey",eye="black"))]
df=spark.createDataFrame(data)
df.printSchema()
#root
# |-- name: string (nullable = true)
# |-- prop: struct (nullable = true)
# |    |-- hair: string (nullable = true)
# |    |-- eye: string (nullable = true)

#Access struct column
df.select(df.prop.hair).show()
df.select(df["prop.hair"]).show()
df.select(col("prop.hair")).show()

#Access all columns from struct
df.select(col("prop.*")).show()


Column Functions:

Arithmetic(),
alias(),
isin(),
asc(),
desc(),
contains(),
between(),
cast(),
like(),
substring(),
when() & otherwise()

**select**() function:

 It is used to select single, multiple, column by index, all columns from the list and the nested columns from a DataFrame, PySpark select() is a transformation function hence it returns a new DataFrame with the selected columns.

 Collect():

 collect() function of the RDD/DataFrame is an action operation that returns all elements of the DataFrame


collect () vs select ()

select() is a transformation that returns a new DataFrame and holds the columns that are selected whereas collect() is an action that returns the entire data set in an Array to the driver.

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data=[(100,2,1),(200,3,4),(300,4,4)]
df=spark.createDataFrame(data).toDF("col1","col2","col3")

#Arthmetic operations
df.select(df.col1 + df.col2).show()
'''df.select(df.col1 - df.col2).show()
df.select(df.col1 * df.col2).show()
df.select(df.col1 / df.col2).show()
df.select(df.col1 % df.col2).show()
df.select(df.col2 > df.col3).show()
df.select(df.col2 < df.col3).show()
df.select(df.col2 == df.col3).show()'''

data=[("James","Bond","100",None),
      ("Ann","Varsa","200",'F'),
      ("Tom Cruise","XXX","400",''),
      ("Tom Brand",None,"400",'M')]
columns=["fname","lname","id","gender"]
df=spark.createDataFrame(data,columns)
#show with alias() function
df.select(df.fname.alias("first_name"), \
          df.lname.alias("last_name")
   ).show(1)

df.select(expr(" fname ||','|| lname").alias("fullName")).show(1) #import expr
#asc, desc to sort ascending and descending order repsectively.
df.sort(df.fname.asc()).show()
df.sort(df.fname.desc()).show()
df.sort("fname", "lname", ascending=[True, False]) \
  .show()
#print('Ordery function')
df.orderBy(col("fname").asc(),col("lname").asc()).show(truncate=False)

print('Sort using spark ')

df.createOrReplaceTempView("EMP")
spark.sql("select fname,lname,id,gender from EMP ORDER BY fname asc").show(truncate=False)


#cast
df.select(df.fname,df.id.cast("int")).printSchema()
#between
df.filter(df.id.between(100,300)).show()
#contains
df.filter(df.fname.contains("Cruise")).show()
#startswith, endswith()
df.filter(df.fname.startswith("T")).show()
df.filter(df.fname.endswith("Cruise")).show()
#isNull & isNotNull
df.filter(df.lname.isNull()).show()
df.filter(df.lname.isNotNull()).show()
#like , rlike
df.select(df.fname,df.lname,df.id) \
  .filter(df.fname.like("%om"))
#Substring
df.select(df.fname.substr(1,2).alias("substr")).show()
#when & otherwise
from pyspark.sql.functions import when
df.select(df.fname,df.lname,when(df.gender=="M","Male") \
              .when(df.gender=="F","Female") \
              .when(df.gender==None ,"") \
              .otherwise(df.gender).alias("new_gender") \
    ).show()
#isin
li=["100","200"]
df.select(df.fname,df.lname,df.id) \
  .filter(df.id.isin(li)) \
  .show()

withColumn():

 It is a transformation function of DataFrame which is used to change the value, convert the datatype of an existing column, create a new column, and many more.


 What is the difference between where and filter in PySpark?

In PySpark, both filter() and where() functions are used to select out data based on certain conditions. They are used interchangeably, and both of them essentially perform the same operation.

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr,col
from pyspark.sql.functions import sum,avg,max
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]
columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.withColumn("salary",col("salary").cast("Integer")).show()
#Update The Value of an Existing Column
df.withColumn("salary",col("salary")*100).show()
#Create a Column from an Existing
df.withColumn("CopiedColumn",col("salary")* -1).show()
#Rename Column
df.withColumnRenamed("gender","sex") \
  .show(truncate=False)
#drop column
df.drop("salary") \
  .show()
# Remove duplicates on selected columns using dropDuplicates()
dropDisDF = df.dropDuplicates(["gender","salary"])
print("Distinct count of gender & salary : "+str(dropDisDF.count()))
dropDisDF.show(truncate=False)

# Sort using spark SQL

df.createOrReplaceTempView("EMP")
spark.sql("select employee_name,department,state,salary,age,bonus from EMP ORDER BY department asc").show(truncate=False)


#filter
# Using equal condition
df.filter(df.salary == 3000).show(truncate=False)
# Using SQL Expression
df.filter("gender == 'M'").show()
# Filter multiple conditions
df.filter( (df.lastname  == "Smith") & (df.gender  == "M") ) \
    .show(truncate=False)
df.filter(df.firstname.like("%ber%")).show()

# Using groupBy().sum()
df.groupBy("gender").sum("salary").show(truncate=False)
# Using filter on aggregate data
df.groupBy("gender") \
    .agg(sum("salary").alias("sum_salary"), \
      avg("salary").alias("avg_salary"), \
      sum("salary").alias("sum_sal"), \
      max("salary").alias("max_sal")) \
    .where(col("sum_salary") >= 500) \
    .show(truncate=False)

# Register DataFrame as a temporary view
df.createOrReplaceTempView("employees")

# Using SQL Query
sql_string = """SELECT gender,
       SUM(salary) AS sum_salary,
       AVG(salary) AS avg_salary,
       SUM(salary) AS sum_sal,
       MAX(salary) AS max_sal
FROM employees
GROUP BY gender
HAVING SUM(salary) >= 1000"""

# Execute SQL query against the temporary view
df2 = spark.sql(sql_string)
df2.show()

+------+-----------+
|gender|sum(salary)|
+------+-----------+
|M     |11000      |
|F     |3999       |
+------+-----------+

+------+----------+------------------+-------+-------+
|gender|sum_salary|avg_salary        |sum_sal|max_sal|
+------+----------+------------------+-------+-------+
|M     |11000     |3666.6666666666665|11000  |4000   |
|F     |3999      |1999.5            |3999   |4000   |
+------+----------+------------------+-------+-------+

+------+----------+------------------+-------+-------+
|gender|sum_salary|        avg_salary|sum_sal|max_sal|
+------+----------+------------------+-------+-------+
|     M|     11000|3666.6666666666665|  11000|   4000|
|     F|      3999|            1999.5|   3999|   4000|
+------+----------+------------------+-------+-------+



JOIN():

How Join works?

PySpark’s join operation combines data from two or more Datasets based on a common column or key. It is a fundamental operation in PySpark and is similar to SQL joins.


In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)


dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner") \
     .show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left") \
    .show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right") \
   .show(truncate=False)

empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)

empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

joinDF2 = spark.sql("select * from EMP e Left JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

Map() & FlatMap()

The map()in PySpark is a transformation function that is used to apply a function/lambda to each element of an RDD (Resilient Distributed Dataset) and return a new RDD consisting of the result.


PySpark flatMap(): is a transformation operation that flattens the RDD/DataFrame (array/map DataFrame columns) after applying the function on every element and returns a new PySpark RDD/DataFrame.


In [None]:
# Imports
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
    .appName("SparkByExamples.com").getOrCreate()

data = ["Project","Gutenberg’s","Alice’s","Adventures",
"in","Wonderland","Project","Gutenberg’s","Adventures",
"in","Wonderland","Project","Gutenberg’s"]

rdd=spark.sparkContext.parallelize(data)
# map() with rdd
rdd2=rdd.map(lambda x: (x,1))
#for element in rdd2.collect():
 #   print(element)
rdd3=rdd2.reduceByKey(lambda x,y:x+y)
rdd3.collect()



[('Alice’s', 1),
 ('Gutenberg’s', 3),
 ('Adventures', 2),
 ('in', 2),
 ('Wonderland', 2),
 ('Project', 3)]

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
    .appName("SparkByExamples.com").getOrCreate()

data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)
#for element in rdd.collect():
  #  print(element)

rdd2=rdd.flatMap(lambda x: x.split(" "))
for element in rdd2.collect():
    print(element)

Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
Project
Gutenberg’s
Adventures
in
Wonderland
Project
Gutenberg’s


EXPLODE Function():

In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pyspark-by-examples').getOrCreate()

arrayData = [
        ('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
        ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
        ('Robert',['CSharp',''],{'hair':'red','eye':''}),
        ('Washington',None,None),
        ('Jefferson',['1','2'],{})]
df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])

from pyspark.sql.functions import explode
df2 = df.select(df.name,explode(df.knownLanguages))
df2.printSchema()
df2.show()

In [None]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com') \
                    .getOrCreate()

# Prepare Data
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

# Create DataFrame
df = spark.createDataFrame(data=data,schema=columns)
df.show()

# foreach() Example
def f(df):
    print(df.Seqno)
df.foreach(f)

+-----+------------+
|Seqno|        Name|
+-----+------------+
|    1|  john jones|
|    2|tracey smith|
|    3| amy sanders|
+-----+------------+



Read Files csv, parquet, Excel, text etc

In [None]:
# Import
import pyspark
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.getOrCreate()

# Read CSV File
#df = spark.read.csv("sample_data/mnist_test.csv")
#spark.write.csv("sample_data/mnist_test1.csv")
#df3 = spark.read.options(header='True', inferSchema='True', delimiter=',') \
 # .csv("sample_data/mnist_test.csv")
df4 = spark.read.json("sample_data/anscombe.json")
df4.printSchema()
df4.show()

*****BroadCast Join*****

Broadcast join is an optimization technique in the Spark SQL engine that is used to join two DataFrames. This technique is ideal for joining a large DataFrame with a smaller one. Traditional joins take longer as they require more data shuffling and data is always collected at the driver.

* The primary goal of a broadcast join is to eliminate data shuffling and network overhead associated with join operations, which can result in considerable speed benefits.

* A broadcast join sends the smaller table (or DataFrame) to all worker nodes, ensuring each worker node has a complete copy of the smaller table in memory.

**Types of Broadcast join.**

There are two types of broadcast joins.

* Broadcast hash joins:     
In this case, the driver builds the in-memory hash DataFrame to distribute it to the executors.
* Broadcast nested loop join:    
It is a nested for-loop join. It is very good for non-equi joins or coalescing joins.

In [None]:
#Enable broadcast Join and
#Set Threshold limit of size in bytes of a dataFrame to broadcast
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
sc=spark.sparkContext

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 104857600)

#Disable broadcast Join.
#spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

#Create a Larger DataFrame using weather Dataset in Databricks

# Create DataFrames from sample data
sales_data = [(1, 101, 2), (2, 102, 1), (3, 103, 3), (4, 101, 1), (5, 104, 4)]
products_data = [(101, "Learn C++", 10), (102, "Mobile: X1", 20), (103, "LCD", 30), (104, "Laptop", 40)]

sales_columns = ["order_id", "product_id", "quantity"]
products_columns = ["product_id", "product_name", "price"]

sales_df = spark.createDataFrame(sales_data, schema=sales_columns)
products_df = spark.createDataFrame(products_data, schema=products_columns)

# Perform broadcast join
result = sales_df.join(broadcast(products_df), sales_df["product_id"] == products_df["product_id"])

# Show result
result.show()

+--------+----------+--------+----------+------------+-----+
|order_id|product_id|quantity|product_id|product_name|price|
+--------+----------+--------+----------+------------+-----+
|       1|       101|       2|       101|   Learn C++|   10|
|       2|       102|       1|       102|  Mobile: X1|   20|
|       3|       103|       3|       103|         LCD|   30|
|       4|       101|       1|       101|   Learn C++|   10|
|       5|       104|       4|       104|      Laptop|   40|
+--------+----------+--------+----------+------------+-----+

