### Creating spark sessions.

Spark session is entry point for spark. This contains all configuration. Think like a main method of class.


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

## File reading

* Spark can read wide verity of files.  Json, CSV, Text files, Parquet and Avro.
- Spark has three abstractions. Dataset, Dataframe, RDD.
- Rdd - (Resilient Distributed Data) - very initial abstraction of the data. Every operation has to be programmed.
- DataFrame - Very efficident data abstraction. Think like RDD + Structure. Any data that has structure, JSON, CSV,XML and Parquet files can be read very easily.
- DataSet - Dataset is exactly like DataFrame except that it does not need schema. An improved version of RDD that can also has DataFrame's optimizations

### Parquet file format (just for reference)

- Parquet (read like parque) is a file format like csv or json or xml. This compresses the data and data will be binary formation (you cannot open and read it unlike csv.)

## So what is difference between normal file read and spark

Spark processes the data in parallel. When you read the data, based on the size it can break the data and process it in parallel. To process in parallel you need to allocate capacity (cores)



### Data Frame:
* DataFrames are the most used abstractions. When i say abstraction, it is a wrapper around the data in memory.

- With the dataframe, you can do all processing with lot of pre-built methods that spark comes by default.  

- You can also process the data in the form of queries



## What kind of data processing is done in spark.?

Spark has two types of operations
1. Transformations
2. Actions.

**Transformations:** any structural changes of the data. example, adding new columns, updating any data, dropping any columns

**Actions:** Outcomes of processing. Eg: Saving file, getting count of records, getting all elements.


### What kind of work Spark can do.

* Spark can do something called batch operations and streming/real-time operations.

- Batch workload: Its like processing data at once. For eg: running to process all files in a folder

Real-time workload: This is called streaming processing. The data comes continously and spark processes them continously. Think like water stream and a fish net. data comes continously and spark job processes on the fly



### File reading example. CSV File is used.

## Either use 

wget https://raw.githubusercontent.com/srini-daruna/temp-repo/master/spark-demo/city_attributes.csv .
or
download the file directly from the link 

In [2]:
df1 = spark.read.option(
    "header","true").csv(
    "./spark-demo/city_attributes.csv")


### Basic operations.

Some basic operations to play with are, 

* printSchema : Shows the structure of the data
- count : gives total record count
- show: like display in pandas. Shows 20 records sample. The records to show can be adjusted. default value is 20
- collect: get all records in a list

In [3]:
df1.show()

+-------------+-------------+---------+-----------+
|         City|      Country| Latitude|  Longitude|
+-------------+-------------+---------+-----------+
|    Vancouver|       Canada| 49.24966|-123.119339|
|     Portland|United States|45.523449|-122.676208|
|San Francisco|United States|37.774929|-122.419418|
|      Seattle|United States|47.606209|-122.332069|
|  Los Angeles|United States|34.052231|-118.243683|
|    San Diego|United States|32.715328|-117.157257|
|    Las Vegas|United States|36.174969|-115.137222|
|      Phoenix|United States| 33.44838|-112.074043|
|  Albuquerque|United States|35.084492|-106.651138|
|       Denver|United States|39.739151|-104.984703|
|  San Antonio|United States| 29.42412| -98.493629|
|       Dallas|United States|32.783058| -96.806671|
|      Houston|United States|29.763281| -95.363274|
|  Kansas City|United States|39.099731| -94.578568|
|  Minneapolis|United States|44.979969|  -93.26384|
|  Saint Louis|United States| 38.62727| -90.197891|
|      Chica

In [4]:
df1.count()

36

In [5]:
df1.printSchema()

root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



### You can filter the data like below

In [12]:
df1.filter("City = 'Indianapolis'").show()

+------------+-------------+---------+----------+
|        City|      Country| Latitude| Longitude|
+------------+-------------+---------+----------+
|Indianapolis|United States|39.768379|-86.158043|
+------------+-------------+---------+----------+



In [14]:
## You can new column like this
# you need to use lit to add a constant. lit means literal
from pyspark.sql.functions import lit
df2 = df1.withColumn("new_column",lit("this is sample"))
df2.show()

+-------------+-------------+---------+-----------+--------------+
|         City|      Country| Latitude|  Longitude|    new_column|
+-------------+-------------+---------+-----------+--------------+
|    Vancouver|       Canada| 49.24966|-123.119339|this is sample|
|     Portland|United States|45.523449|-122.676208|this is sample|
|San Francisco|United States|37.774929|-122.419418|this is sample|
|      Seattle|United States|47.606209|-122.332069|this is sample|
|  Los Angeles|United States|34.052231|-118.243683|this is sample|
|    San Diego|United States|32.715328|-117.157257|this is sample|
|    Las Vegas|United States|36.174969|-115.137222|this is sample|
|      Phoenix|United States| 33.44838|-112.074043|this is sample|
|  Albuquerque|United States|35.084492|-106.651138|this is sample|
|       Denver|United States|39.739151|-104.984703|this is sample|
|  San Antonio|United States| 29.42412| -98.493629|this is sample|
|       Dallas|United States|32.783058| -96.806671|this is sam

In [15]:
# drop column
df2.drop("Longitude").show()

+-------------+-------------+---------+--------------+
|         City|      Country| Latitude|    new_column|
+-------------+-------------+---------+--------------+
|    Vancouver|       Canada| 49.24966|this is sample|
|     Portland|United States|45.523449|this is sample|
|San Francisco|United States|37.774929|this is sample|
|      Seattle|United States|47.606209|this is sample|
|  Los Angeles|United States|34.052231|this is sample|
|    San Diego|United States|32.715328|this is sample|
|    Las Vegas|United States|36.174969|this is sample|
|      Phoenix|United States| 33.44838|this is sample|
|  Albuquerque|United States|35.084492|this is sample|
|       Denver|United States|39.739151|this is sample|
|  San Antonio|United States| 29.42412|this is sample|
|       Dallas|United States|32.783058|this is sample|
|      Houston|United States|29.763281|this is sample|
|  Kansas City|United States|39.099731|this is sample|
|  Minneapolis|United States|44.979969|this is sample|
|  Saint L

In [11]:
## Selecting only required columns

df1.select("City","Country").show()

+-------------+-------------+
|         City|      Country|
+-------------+-------------+
|    Vancouver|       Canada|
|     Portland|United States|
|San Francisco|United States|
|      Seattle|United States|
|  Los Angeles|United States|
|    San Diego|United States|
|    Las Vegas|United States|
|      Phoenix|United States|
|  Albuquerque|United States|
|       Denver|United States|
|  San Antonio|United States|
|       Dallas|United States|
|      Houston|United States|
|  Kansas City|United States|
|  Minneapolis|United States|
|  Saint Louis|United States|
|      Chicago|United States|
|    Nashville|United States|
| Indianapolis|United States|
|      Atlanta|United States|
+-------------+-------------+
only showing top 20 rows



In [18]:
# renaming column
df1.withColumnRenamed("City","RenamedCity").show()

+-------------+-------------+---------+-----------+
|  RenamedCity|      Country| Latitude|  Longitude|
+-------------+-------------+---------+-----------+
|    Vancouver|       Canada| 49.24966|-123.119339|
|     Portland|United States|45.523449|-122.676208|
|San Francisco|United States|37.774929|-122.419418|
|      Seattle|United States|47.606209|-122.332069|
|  Los Angeles|United States|34.052231|-118.243683|
|    San Diego|United States|32.715328|-117.157257|
|    Las Vegas|United States|36.174969|-115.137222|
|      Phoenix|United States| 33.44838|-112.074043|
|  Albuquerque|United States|35.084492|-106.651138|
|       Denver|United States|39.739151|-104.984703|
|  San Antonio|United States| 29.42412| -98.493629|
|       Dallas|United States|32.783058| -96.806671|
|      Houston|United States|29.763281| -95.363274|
|  Kansas City|United States|39.099731| -94.578568|
|  Minneapolis|United States|44.979969|  -93.26384|
|  Saint Louis|United States| 38.62727| -90.197891|
|      Chica

### You can do joins, unions and many more with spark

#### to join, you need to two data frames. Here i am creating two data frames and joining them

In [3]:
dataframe1 = spark.read.option("header","true").csv("./spark-demo/imdb_data_sample.csv")

In [7]:
dataframe2 = spark.read.option("header","true").option("delimiter",";").csv("./spark-demo/AllMoviesDetailsCleaned.csv")

## You need to specify on which columns you want to join.
Similar to a SQL Query

**Example SQL Query:** We write SQL Query to join two tables like below. We will use same understanding with dataframes.

* Select * from table1 a JOIN table2 b ON a.some_column = b.some_column

In [5]:
dataframe1.printSchema()

root
 |-- imdb_title_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- year: string (nullable = true)
 |-- date_published: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- country: string (nullable = true)
 |-- language: string (nullable = true)
 |-- director: string (nullable = true)
 |-- writer: string (nullable = true)
 |-- production_company: string (nullable = true)
 |-- actors: string (nullable = true)
 |-- description: string (nullable = true)
 |-- avg_vote: string (nullable = true)
 |-- votes: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- usa_gross_income: string (nullable = true)
 |-- worlwide_gross_income: string (nullable = true)
 |-- metascore: string (nullable = true)
 |-- reviews_from_users: string (nullable = true)
 |-- reviews_from_critics: string (nullable = true)



In [8]:
dataframe2.printSchema()

root
 |-- id: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nullable = true)
 |-- production_companies_number: string (nullable = true)
 |-- production_countries_number: string (nullable = true)
 |-- spoken_languages_number: string (nullable = true)



In [16]:
from pyspark.sql.functions import col
dataframe2.join(dataframe1, col("imdb_id") == col("imdb_title_id")).show()

+---+--------+--------------------+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+--------+-------+----------------+--------+--------------------+--------------------+------------+----------+---------------------------+---------------------------+-----------------------+-------------+--------------------+--------------------+----+--------------+--------------------+--------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+------+-----------+----------------+---------------------+---------+------------------+--------------------+
| id|  budget|              genres|  imdb_id|original_language|      original_title|            overview|popularity|production_companies|production_countries|release_date| revenue|runtime|spoken_languages|  status|             tagline|               title|vote_average|vote_cou

In [17]:
one_data_for_union = dataframe1.select("budget")
second_data_for_union = dataframe2.select("budget")

result = one_data_for_union.union(second_data_for_union)
result.show()

+----------+
|    budget|
+----------+
|      null|
|    $ 2250|
|      null|
|   $ 45000|
|      null|
|      null|
|      null|
| ITL 45000|
|ROL 400000|
|   $ 30000|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|    $ 5700|
+----------+
only showing top 20 rows

