#  How to install Pyspark in Google Colab, and join tables and more.

Hello everyone, today we are going to discuss how to install Pyspark in Colab and explain
how to use Broadcast  join more than 2 tables in PySpark.

When you are working with Big Data one of the most common task that you have is **merge different tables to a single table**.
This happens when you have different sources and your company is asking your to have a single table in your Data Ware House **DWH**. For this project I will use Google Colab and I will explain how to install Pyspark and how to create different tables and merge them into a single table.

## Step 1. Mounting your Google Drive. 

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Step 2. Installing Java
Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore, our first task is to download Java.

In [6]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

## Step 3. Installing Apache Spark

Next, we will install Apache Spark 3.3.0 with Hadoop 3 from [here](http://spark.apache.org/downloads.html).

In [18]:
!wget -q https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz

we list the files downloaded

In [20]:
!ls

drive  sample_data  spark-3.3.0-bin-hadoop3.tgz


Now, we just need to unzip that folder.

In [21]:
!tar xf spark-3.3.0-bin-hadoop3.tgz

There is one last thing that we need to install and that is the [findspark](https://pypi.org/project/findspark/) library. It will locate Spark on the system and import it as a regular library.

In [22]:
!pip install -q findspark

This will enable us to run Pyspark in the Colab environment.

In [23]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

We need to locate Spark in the system. For that, we import findspark and use the findspark.init() method.

In [24]:
import findspark
findspark.init()

Now, we can import [SparkSession](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html?highlight=sparksession#pyspark.sql.SparkSession) from [pyspark.sql](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html?highlight=sparksession#pyspark-sql-module) and create a SparkSession, which is the entry point to Spark.

You can give a name to the session using appName() and add some configurations with config() if you wish.

# Data Science with Pyspark

Due to we are going 
Run the code below to download the compressed datasets data.zip.

In [None]:
!wget https://github.com/ruslanmv/Data-Science-in-Jupyter-Notebook/raw/master/data.zip


--2022-06-25 13:22:11--  https://github.com/ruslanmv/Data-Science-in-Jupyter-Notebook/raw/master/data.zip
Resolving github.com (github.com)... 52.69.186.44
Connecting to github.com (github.com)|52.69.186.44|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/ruslanmv/Data-Science-in-Jupyter-Notebook/master/data.zip [following]
--2022-06-25 13:22:11--  https://raw.githubusercontent.com/ruslanmv/Data-Science-in-Jupyter-Notebook/master/data.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 28337418 (27M) [application/zip]
Saving to: ‘data.zip’


2022-06-25 13:22:12 (146 MB/s) - ‘data.zip’ saved [28337418/28337418]



You can then unzip the archive using the zipfile module.

In [None]:
import zipfile
# Unzip the dataset
local_zip = './data.zip'
zip_ref = zipfile.ZipFile(local_zip, 'r')
zip_ref.extractall('./')
zip_ref.close()


**What is PySpark?**

PySpark is an Apache Spark interface in Python. It is used for
collaborating with Spark using APIs written in Python. It also supports
Spark’s features like Spark DataFrame, Spark SQL, Spark Streaming, Spark
MLlib and Spark Core.

**What is PySpark SparkContext?**

PySpark SparkContext is an initial entry point of the spark
functionality. It also represents Spark Cluster Connection and can be
used for creating the Spark RDDs (Resilient Distributed Datasets) and
broadcasting the variables on the cluster.

Now, we can import [SparkSession](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html?highlight=sparksession#pyspark.sql.SparkSession) from [pyspark.sql](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html?highlight=sparksession#pyspark-sql-module) and create a SparkSession, which is the entry point to Spark.

You can give a name to the session using appName() and add some configurations with config() if you wish.

In [25]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

Finally, print the SparkSession variable.

In [27]:
spark

Before enter in more details with Pyspark let us Merge two DataFrames in PySpark

## Merge two DataFrames in PySpark

PySpark union() and unionAll() transformations are used to merge two or more DataFrame’s of the same schema or structure.

First, let’s create two DataFrame with the same schema.

In [30]:

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('App').getOrCreate()

simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000) \
  ]

columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)


root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
+-------------+----------+-----+------+---+-----+



Second DataFrame

In [31]:

simpleData2 = [("James","Sales","NY",90000,34,10000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns2= ["employee_name","department","state","salary","age","bonus"]

df2 = spark.createDataFrame(data = simpleData2, schema = columns2)

df2.printSchema()
df2.show(truncate=False)


root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



## Merge two or more DataFrames using union

In [32]:
unionDF = df.union(df2)
unionDF.show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



We create dataframes with columns ‘a’ and ‘b’ of some random values and pass all these three dataframe to our above-created method unionAll() and get the resultant dataframe as output and show the result.

In [29]:
# import modules
from pyspark.sql import SparkSession
import functools

# explicit function
def unionAll(dfs):
	return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)


df1 = spark.createDataFrame([[1, 1], [2, 2]], ['a', 'b'])

# different column order.
df2 = spark.createDataFrame([[3, 333], [4, 444]], ['b', 'a'])
df3 = spark.createDataFrame([[555, 5], [666, 6]], ['b', 'a'])

unioned_df = unionAll([df1, df2, df3])
unioned_df.show()


+---+---+
|  a|  b|
+---+---+
|  1|  1|
|  2|  2|
|333|  3|
|444|  4|
|  5|555|
|  6|666|
+---+---+



## PySpark Broadcast Join

 This join can be used for the data frame that is smaller in size which can be broadcasted with the PySpark application to be used further.

It is a join operation of a large data frame with a smaller data frame in PySpark Join model. It reduces the data shuffling by broadcasting the smaller data frame in the nodes of PySpark cluster

In [46]:
from pyspark.sql.functions import broadcast

In [37]:
spark = SparkSession.builder.appName('App').getOrCreate()
sc=spark.sparkContext

In [38]:
data1 = [{'Name':'Jhon','ID':21.528,'Add':'USA'},{'Name':'Joe','ID':3.69,'Add':'USA'},{'Name':'Tina','ID':2.48,'Add':'IND'},{'Name':'Jhon','ID':22.22, 'Add':'USA'},{'Name':'Joe','ID':5.33,'Add':'INA'}]
a = sc.parallelize(data1)

RDD is created using sc.parallelize.

In [39]:
b = spark.createDataFrame(a)
b.show()

+---+------+----+
|Add|    ID|Name|
+---+------+----+
|USA|21.528|Jhon|
|USA|  3.69| Joe|
|IND|  2.48|Tina|
|USA| 22.22|Jhon|
|INA|  5.33| Joe|
+---+------+----+



Let us create the other data frame with data2. 

In [42]:
data2 = [{'Name':'Jhon','ID':21.528,'Add':'USA'},{'Name':'Joe','ID':3.69,'Add':'USeA'},{'Name':'Tina','ID':2.48,'Add':'IND'},{'Name':'Jhon','ID':22.22, 'Add':'USdA'},{'Name':'Joe','ID':5.33,'Add':'rsa'}]

In [43]:
c = sc.parallelize(data2)

In [44]:
d = spark.createDataFrame(c)

Let us try to broadcast the data in the data frame, the method broadcast is used to broadcast the data frame out of it.

In [47]:
e = broadcast(b)

Let us now join both the data frame using a particular column name out of it. This avoids the data shuffling throughout the network in PySpark application.

In [48]:
f = d.join(broadcast(e),d.Add == e.Add)

In [49]:
f.show()

+---+------+----+---+------+----+
|Add|    ID|Name|Add|    ID|Name|
+---+------+----+---+------+----+
|USA|21.528|Jhon|USA| 22.22|Jhon|
|USA|21.528|Jhon|USA|  3.69| Joe|
|USA|21.528|Jhon|USA|21.528|Jhon|
|IND|  2.48|Tina|IND|  2.48|Tina|
+---+------+----+---+------+----+



We can also do the join operation over the other columns also that can be further used for the creation of a new data frame.

In [50]:
f = d.join(broadcast(e),d.Name == e.Name)
f.show()

+----+------+----+---+------+----+
| Add|    ID|Name|Add|    ID|Name|
+----+------+----+---+------+----+
| USA|21.528|Jhon|USA| 22.22|Jhon|
| USA|21.528|Jhon|USA|21.528|Jhon|
|USeA|  3.69| Joe|INA|  5.33| Joe|
|USeA|  3.69| Joe|USA|  3.69| Joe|
| IND|  2.48|Tina|IND|  2.48|Tina|
|USdA| 22.22|Jhon|USA| 22.22|Jhon|
|USdA| 22.22|Jhon|USA|21.528|Jhon|
| rsa|  5.33| Joe|INA|  5.33| Joe|
| rsa|  5.33| Joe|USA|  3.69| Joe|
+----+------+----+---+------+----+



In [None]:
Export a table dataframe in PySpark to csv

In [52]:
f.toPandas().to_csv('data.csv')

## Loading data into PySpark

We need to load the dataset. We will use the [read.csv](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html?highlight=sparksession#pyspark.sql.DataFrameReader.csv) module. The **inferSchema** parameter provided will enable Spark to automatically determine the data type for each column but it has to go over the data once. If you don’t want that to happen, then you can instead provide the schema explicitly in the **schema** parameter.

In [53]:
df = spark.read.csv("data.csv", header=True, inferSchema=True)

Now let’s get started with PySpark in more details!

**What is spark-submit?**

Spark-submit is a utility to run a pyspark application job by specifying
options and configurations.

In [None]:
spark-submit \
     - -master < master-url > \
    --deploy-mode < deploy-mode > \
    --conf < key <= <value > \
    --driver-memory < value > g \
    - -executor-memory < value > g \
    - -executor-cores < number of cores > \
    --jars < comma separated dependencies > \
    --packages < package name > \
      --py-files \
    < application > <application args >


where

–master : Cluster Manager (yarn, mesos, Kubernetes, local, local(k))  
–deploy-mode: Either cluster or client  
–conf: We can provide runtime configurations, shuffle parameters,
application configurations using –conf. Ex: –conf
spark.sql.shuffle.partitions = 300  
–driver-memory : Amount of memory to allocate for a driver (Default:
1024M).  
–executor-memory : Amount of memory to use for the executor process.  
–executor cores : Number of CPU cores to use for the executor process.

**What are RDDs in PySpark?**

RDDs expand to Resilient Distributed Datasets. These are the elements
that are used for running and operating on multiple nodes to perform
parallel processing on a cluster. Since RDDs are suited for parallel
processing, they are immutable elements. This means that once we create
RDD, we cannot modify it. RDDs are also fault-tolerant which means that
whenever failure happens, they can be recovered automatically. Multiple
operations can be performed on RDDs to perform a certain task.

-   Data Representation: RDD is a distributed collection of data
    elements without any schema
-   Optimization: No in-built optimization engine for RDDs
-   Schema: we need to define the schema manually.
-   Aggregation Operation: RDD is slower than both Dataframes and
    Datasets to perform simple operations like grouping the data

**Creation of RDD using textFile API**

In [None]:
rdd = spark.sparkContext.textFile('practice/test')
rdd.take(5)
for i in rdd.take(5):
    print(i)


**Get the Number of Partitions in the RDD**

In [None]:
rdd.getNumPartitions()


**Get the Number of elements in each partition**

In [None]:
rdd.glom().map(len).collect()


**Create RDD using textFile API and a defined number of partitions**

In [None]:
rdd = spark.sparkContext.textFile('practice/test', 10)


**Create a RDD from a Python List**

In [None]:
lst = [1, 2, 3, 4, 5, 6, 7]
rdd = spark.sparkContext.parallelize(lst)
for i in rdd.take(5):
    print(i)


**Create a RDD from a Python List**

In [None]:
lst = [1, 2, 3, 4, 5, 6, 7]
rdd = spark.sparkContext.parallelize(lst)
for i in rdd.take(5):
    print(i)


**Create a RDD from local file**

In [None]:
lst = open('/staging/test/sample.txt').read().splitlines()
lst[0:10]
rdd = spark.sparkContext.parallelize(lst)
for i in rdd.take(5):
    print(i)


**Create RDD from range function**

In [None]:
lst1 = range(10)
rdd = spark.sparkContext.parallelize(lst1)
for i in rdd.take(5):
    print(i)


**Create RDD from a DataFrame**

In [None]:
df = spark.createDataFrame(
    data=(('robert', 35), ('Mike', 45)), schema=('name', 'age'))
df.printSchema()
df.show()
rdd1 = df.rdd
type(rdd1)
for i in rdd1.take(2):
    print(i)


**What are Dataframes?**

It was introduced first in Spark version 1.3 to overcome the limitations
of the Spark RDD. Spark Dataframes are the distributed collection of the
data points, but here, the data is organized into the named columns

-   Data Representation:It is also the distributed collection organized
    into the named columns
-   Optimization: It uses a catalyst optimizer for optimization.
-   Schema: It will automatically find out the schema of the dataset.
-   Aggregation Operation: It performs aggregation faster than both RDDs
    and Datasets.

**What are Datasets?**

Spark Datasets is an extension of Dataframes API with the benefits of
both RDDs and the Datasets. It is fast as well as provides a type-safe
interface.

-   Data Representation:It is an extension of Dataframes with more
    features like type-safety and object-oriented interface.
-   Optimization:It uses a catalyst optimizer for optimization.
-   Schema: It will automatically find out the schema of the dataset.
-   Aggregation Operation:Dataset is faster than RDDs but a bit slower
    than Dataframes.

**What type of operation has Pyspark?**

The operations can be of 2 types, actions and transformation.

**What is Transformation in Pyspark?**

Transformation: These operations when applied on RDDs result in the
creation of a new RDD. Some of the examples of transformation operations
are filter, groupBy, map. Let us take an example to demonstrate
transformation operation by considering filter() operation:

In [None]:

from pyspark import SparkContext
sc = SparkContext("local", "Transdormation Demo")
words_list = sc.parallelize(
    ["pyspark",
     "interview",
     "questions"]
)
filtered_words = words_list.filter(lambda x: 'interview' in x)
filtered = filtered_words.collect()
print(filtered)


In [None]:

The output of the above code would be:


In [None]:
[
    "interview"
]


**What is Action in Pyspark?**

Action: These operations instruct Spark to perform some computations on
the RDD and return the result to the driver. It sends data from the
Executer to the driver. count(), collect(), take() are some of the
examples. Let us consider an example to demonstrate action operation by
making use of the count() function.

In [None]:

from pyspark import SparkContext
sc = SparkContext("local", "Action Demo")
words = sc.parallelize(
    ["pyspark",
     "interview",
     "questions"]
)
counts = words.count()
print("Count of elements in RDD -> ",  counts)


In [None]:

we count the number of elements in the spark RDDs. The output of this
code is Count of elements in RDD -\> 3 \  # \# Creating DataFrame


**From RDDs**

In [None]:
from pyspark.sql.types import*


**Infer Schema**

In [None]:
sc = spark.sparkContext
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
peopledf = spark.createDataFrame(people)



**Specify Schema**

In [None]:

people = parts.map(lambda p: Row(name=p[0],
                                 age=int(p[1].strip())))
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for
          field_name in schemaString.split()]
schema = StructType(fields)
spark.createDataFrame(people, schema).show()


**From Spark Data Sources**

**JSON**

In [None]:
df = spark.read.json("customer.json")
df.show()


In [None]:

df2 = spark.read.load("people.json", format="json")



**Parquet files**

In [None]:

df3 = spark.read.load("people.parquet")



**TXT files**

In [None]:
df4 = spark.read.text("people.txt")


### Filter

Filter entries of age, only keep those records of which the values are
\>24

In [None]:
df.filter(df["age"] > 24).show()


Duplicate Values

In [None]:
df = df.dropDuplicates()


### Queries

**What is PySpark SQL?** PySpark SQL is the most popular PySpark module
that is used to process structured columnar data. Once a DataFrame is
created, we can interact with data using the SQL syntax. Spark SQL is
used for bringing native raw SQL queries on Spark by using select,
where, group by, join, union etc. For using PySpark SQL, the first step
is to create a temporary table on DataFrame by using
createOrReplaceTempView()

In [None]:
from pyspark.sql import functions as F


**Select**

In [None]:

 df.select("firstName").show()  # Show all entries in firstNome column
 df.select("firstName", "lastName") \
       .show()
 df.select("firstName",  # Show all entries in firstNome, age and type
                "age",
                explode(''phoneNumber'') \
                .alias(''contactlnfo')') \
        .select("contactlnfo.type",
                "firstName",
                "age") \
        .show()
 df.select(df["firstName", df["age"] + 1)  # Show all entries in firstName and age,
       .show()  # add 1 to the entries of age
 df.select(df['age'] > 24).show()  # Show all entries where age >24



**When**

In [None]:

df.select("firstName",  # Show firstName and 0 or 1 depending on age >30
          F.when(df.age > 30, 1) \
          .otherwise(0)) \
    .show()
# Show firstName if in the given options
df[df.firstName.isin("Jane", "Boris")]
.collect()



**Like**

In [None]:

# Show firstName, and lastName is TRUE if lastName is like Smith
 df.select("firstName",
                df.lastName .like(''Smith')')
      .show()



**Startswith** - **Endswith**

In [None]:
df.select("firstName",  # Show firstName, and TRUE if lastName starts with Sm
          df.lastName \
          .startswith("Sm")) \
    .show()
df.select(df.lastName.endswith("th"))\  # Show last names ending in th
.show()


**Substring**

In [None]:
     df.select(df.firstName.substr(l, 3) \  # Return substrings of firstName
          .alias(''name')') \
          .collect()


**Between**

In [None]:
# Show age: values are TRUE if between 22 and 24
df.select(df.age.between(22, 2s)) \


### Add, Update & Remove Columns

**Adding Columns**

In [None]:
df = df.withColumn('city', df.address.city) \
    .withColumn('postalCade', df.address.pastalCode) \
    .withCalumn('state', df.address.state) \
    .withColumn('streetAddress', df.address.streetAddress) \
    .withColumn('telePhoneNumber ', explode(df.phoneNumber.number)) \
    .withColumn('telePhone Type', explode(df.phoneNumber.type))


**Updating Columns**

In [None]:
df = df.withColumnRenamed('telePhoneNumber ', 'phoneNumber')


**Removing Columns**

In [None]:
df = df.drop("address", "phoneNumber")
df = df.drop(df.address).drop(df.phoneNumber)


### Missing & Replacing Values

In [None]:

df.na.fill(50).show()  # Replace null values
df.na.drop().shaw()  # Return new df omitting rows with null values
df.na \  # Return new df replacing one value with another
.replace(10, 20) \
    .show()



### GroupBy

In [None]:

df.groupBy("age")\  # Group by age, count the members in the groups
.count() \
    .show()



### Sort

In [None]:

peopledf.sort(peopledf.age.desc()).collect()
df.sort("age", ascending=False).collect()
df.orderBy(["age", "city"], ascending=[0, 1])\
    .collect()



### Repartitioning

In [None]:

   df.repartitian(10)\  # df with 10 partitions
      .rdd \
           .getNumPartitions()
    df.coalesce(1).rdd.getNumPartitions()  # df with 1 partition



### Running Queries Programmatically

In [None]:
peopledf.createGlobalTempView("people")
df.createTempView("customer")
df.createOrReplaceTempView("customer")


**Query Views**

In [None]:

df5 = spark.sql("SELECT * FROM customer").show()
peopledf2 = spark.sql("SELEC T* FROM global_ temp.people")\
    .show()



### **Inspect Data**

In [None]:

df.dtypes  # Return df column names and data types
df.show()  # Display the content of df
df.head()  # Return first n raws
df.first()  # Return first row
df.take(2)  # Return the first n rows
df.schema Return the schema of df
df.describe().show()  # Compute summary statistics
df.columns Return the columns of df
df.count()  # Count the number of rows in df
df.distinct().count()  # Count the number of distinct rows in df
df.printSchema()  # Print the schema of df
df.explain()  # Print the (logical and physical) plans



### Output

In [None]:
**Data Structures**


In [None]:
rddl = df.rdd  # Convert df into an ROD
df.taJSON().first()  # Convert df into a ROD of string
df.toPandas()  # Return the contents of df as Pandas DataFrame


**Write** & **Save to Files**

In [None]:

df.select("firstName", "city")\
    .write \
    .save("nameAndCity.parquet")
df.select("firstName", "age") \
    .write \
    .save("namesAndAges.json", format="json")



### **Stopping SparkSession**

In [None]:
>> spark.stop()


## **PySpark RDD**

PySpark is the Spark Python API that exposes the Spark programming model
to Python.

**Inspect SparkContext**

In [None]:

sc.version  # Retrieve SparkContext version
sc.pythonVer  # Retrieve Python version
sc.master  # Master URL to connect to
str(sc.sparkHome)  # Path where Spark is installed an worker nodes
str(sc.sparkUser())  # Retrieve name of the Spark User running SparkContext
sc.appName  # Return application name
sc.applicationld  # Retrieve application ID
sc.defaultParallelism  # Return default level of parallelism
sc.defaultMinPartitions  # Default minimum number of partitions for RDDs



**Configuration**

In [None]:
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
        .setMaster("local")
        .setAppName("My app")
        .set("spark.executor.memory", "1g"))
sc = SparkContext(conf=conf)


**Using The Shell**

In the PySpark shell, a special interpreter aware SparkContext is
already created in the variable called sc.

In [None]:
$ ./bin/spark shell - -master local[2]
$ ./bin/pyspark - -master local[4] - -py files code.py


Set which master the context connects to with the –master argument, and
add Python **.zip, .egg** or **.py** files to the runtime path by
passing a comma separated list to –py-files

### Loading Data

**Parallelized Collections**

In [None]:

 rdd = sc.parallelize([('a', 7), ('a', 2), ('b', 2)])
 rdd = sc.parallelize([('a', 2), ('d', 1), ('b', 1)])
 rdd3 = sc.parallelize(range(100))
 rdd4 = sc.parallelize([("a", ["x", "y", "z"]),
                            ("b", ["p", "r"])]



**External Data**

Read either one text file from HDFS.a local file system or or any
Hadoop-supported file system URI with textFile(). or read in a directory
of text files with wholeTextFiles()

In [None]:

textFile = sc.textFile("/my/directory/*.txt")
textFile2 = sc.wholeTextFiles("/my/directory/")



### Retrieving RDD Information

**Basic Information**

In [None]:

 rdd.getNumPartitions()  # List the number of partitions
 rdd.count()  # Count ROD instances 3
 rdd.countByKey()  # Count ROD instances by key
defaultdict(< type 'int' > , {'a': 2, 'b' : 1})
 rdd.countByValue()  # Count ROD instances by value
defaultdict( < type 'int' > , {('b', 2): 1, '(a', 2): 1, ('a', 7): 1})
 rdd.collectAsMap()  # Return (key,value) pairs as a dictionary
{'a': 2, 'b': 2}
 rdd3.sum()  # Sum of ROD elements 4950
 sc.parallelize([]).isEmpty()  # Check whether ROD is empty
True



**Summary**

In [None]:

 rdd3.max()  # Maximum value of ROD elements 99
 rdd3.min()  # Minimum value of ROD elements
0
 rdd3.mean()  # Mean value of ROD elements
, 9.5
 rdd3.stdev()  # Standard deviation of ROD elements 2a.8660700s772211a
 rdd3.variance()  # Compute variance of ROD elements 833.25
 rdd3.histogram(3)  # Compute histogram by bins
([0, 33, 66, 991, [33, 33, 3, ])
 rdd3.stats()  # Summary statistics (count, mean, stdev, max & min)



### Applying Functions

In [None]:

# Apply a function to each ROD element
rdd.map(lambda x: x+(x[l], x[0])).callect()
[('a', 7, 7, 'a'), ('a', 2, 2, 'a'), ('b', 2, 2, 'b')]
# Apply a function to each ROD element and flatten the result
rdd5 = rdd.flatMap(lambda x: x+(x[l], x[0]))
rdd5.collect()
['a', 7, 7'a', 'a', 2, 2'a', 'b', 2, 2'b']
# Apply a flatMap function to each (key,value) pair of rdd4 without changing the keys
rdds.flatMapValues(lambda x: x).callect()
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]



### Selecting Data

**Getting**

In [None]:

rdd.collect()  # Return a list with all ROD elements
[('a', 7), ('a', 2), ('b', 2)]
rdd.take(2)  # Take first 2 ROD elements
[('a', 7), ('a', 2)]
rdd.first()  # Toke first ROD element
[('a', 7), ('a', 2)]
rdd.top(2)  # Take top 2 ROD elements
[('b', 2), ('a', 7)]



**Sampling**

In [None]:

rdd3.sample(False, 0.15, 81).collect()  # Return sampled subset of rdd3
[3, 4, 27, 31, 40, 41, 42, 43, 60, 76, 79, 80, 86, 97]



**Filtering**

In [None]:

rdd.filter(lambda x: "a" in x).collect()  # Filter the ROD
[('a', 7), ('a', 2)]
rdd5.distinct().callect()  # Return distinct ROD values
['a', 2, 'b', 7]
rdd.keys().collect()  # Return (key,value) RDD's keys
['a', 'a', 'b']


In [None]:
def g(x): print(x)


rdd.foreach(g)  # Apply a function to all ROD elements

('a', 7)
('b', 2)
('a', 2)


In [None]:

# Reshaping Data


**Reducing**

In [None]:
     # Merge the rdd values for each key
     rdd.reduceByKey(lambda x, y: x+y).callect()
     [('a', 9), ('b', 2)]
     rdd.reduce(lambda a, b: a + b)  # Merge the rdd values
    ('a',7,'a',2,'b',2)

**Grouping by**

In [None]:

rdd3.groupBy(lambda x: x % 2)
.mapValues(list)
.collect()
rdd.groupByKey()
.mapValues(list)
.collect()
[('a', [7, 2]), ('b', [2])]



**Aggregating**

In [None]:

seqOp = (lambda x, y: (x[0]+y, x[1]+1))
combOp = (lambda x, y: (x[0]+y[0], x[1]+y[1]))
# Aggregate RDD elements of each partition and then the results
rdd3.aggregate((0, 0), seqOp, combOp)
(4950, 100)
# Aggregate values of each RDD key
rdd.aggregateByKey((0, 0), seqop, combop).collect()
[('a', (9, 2)), ('b', (2, 1))]
# Aggregate the elements of each partition, and then the results
rdd3.fold(0, add)
4950
# Merge the values for each key
rdd.foldByKey(0, add).collect()
[('a', 9), ('b', 2)]
# Create tuples of RDD elements by applying a function
rdd3.keyBy(lambda x: x+x).collect()



### Mathematical Operations

In [None]:
     # Return each rdd value not contained in rdd2
     rdd.subtract(rdd2).collect()
    [('b',2),('a',7)]
    # Return each (key,value) pair of rdd2 with no matching key in rdd
     rdd2.subtractByKey(rdd).collect()
    [('d',1)]
     rdd.cartesian(rdd2).callect() #Return the Cartesian product of rdd and rdd2

**Sort**

In [None]:
     rdd2.sortBy(lambda x: x[l]).collect()  # Sort ROD by given function
    [('d',1),('b',1),('a',2)]
     rdd2.sartByKey().collect() #Sort (key, value) ROD by key
    [('a',2),('b',1),('d',1)]

**Repartitioning**

In [None]:

rdd.repartitian(4)  # New ROD with 4 partitions
rdd.caalesce(1)  # Decrease the number of partitions in the ROD to 1



**Saving**

In [None]:

rdd .saveA sTextFile("rdd.txt")
rdd.saveAsHadaapFile("hdfs://namenadehost/parent/child",
                     'org.apache.hadoop.mapred.TextOutputFormat')



**Execution**

In [None]:
$ ./bin/spark submit examples/src/main/python/pi.py


**Does PySpark provide a machine learning API?**

Similar to Spark, PySpark provides a machine learning API which is known
as MLlib that supports various ML algorithms like:

-   mllib.classification − This supports different methods for binary or
    multiclass classification and regression analysis like Random
    Forest, Decision Tree, Naive Bayes etc.
-   mllib.clustering − This is used for solving clustering problems that
    aim in grouping entities subsets with one another depending on
    similarity.
-   mllib.fpm − FPM stands for Frequent Pattern Matching. This library
    is used to mine frequent items, subsequences or other structures
    that are used for analyzing large datasets.
-   mllib.linalg − This is used for solving problems on linear algebra.
-   mllib.recommendation − This is used for collaborative filtering and
    in recommender systems.
-   spark.mllib − This is used for supporting model-based collaborative
    filtering where small latent factors are identified using the
    Alternating Least Squares (ALS) algorithm which is used for
    predicting missing entries.
-   mllib.regression − This is used for solving problems using
    regression algorithms that find relationships and variable
    dependencies.
-   **Is PySpark faster than pandas?**

PySpark supports parallel execution of statements in a distributed
environment, i.e on different cores and different machines which are not
present in Pandas. This is why PySpark is faster than pandas.

**What is PySpark Architecture?**

PySpark similar to Apache Spark works in master-slave architecture
pattern. Here, the master node is called the Driver and the slave nodes
are called the workers. When a Spark application is run, the Spark
Driver creates SparkContext which acts as an entry point to the spark
application. All the operations are executed on the worker nodes. The
resources required for executing the operations on the worker nodes are
managed by the Cluster Managers

**What is the common workflow of a spark program?**

The most common workflow followed by the spark program is: The first
step is to create input RDDs depending on the external data. Data can be
obtained from different data sources. Post RDD creation, the RDD
transformation operations like filter() or map() are run for creating
new RDDs depending on the business logic. If any intermediate RDDs are
required to be reused for later purposes, we can persist those RDDs.
Lastly, if any action operations like first(), count() etc are present
then spark launches it to initiate parallel computation.

**Congratulations!** You have read an small summary about important
things in **Data Science**.