## Basics of PySpark DataFrame


### Apache Spark


__[Apache Spark](https://spark.apache.org/)__ is one of the hottest new trends in the technology domain. It is the framework with probably the **highest potential to realize the fruit of the marriage between Big Data and Machine Learning.** It runs fast (up to 100x faster than traditional __[Hadoop MapReduce](https://www.tutorialspoint.com/hadoop/hadoop_mapreduce.htm)__) due to in-memory operation, offers robust, distributed, fault-tolerant data objects (called __[RDD](https://www.tutorialspoint.com/apache_spark/apache_spark_rdd.htm)__), and integrates beautifully with the world of machine learning and graph analytics through supplementary packages like __[Mlib](https://spark.apache.org/mllib/)__ and __[GraphX](https://spark.apache.org/graphx/)__.

Spark is implemented on Hadoop/HDFS and written mostly in Scala, a functional programming language, similar to Java. In fact, Scala needs the latest Java installation on your system and runs on JVM. However, for most of the beginners, Scala is not a language that they learn first to venture into the world of data science. Fortunately, Spark provides a wonderful Python integration, called PySpark, which lets Python programmers to interface with the Spark framework and learn how to manipulate data at scale and work with objects and algorithms over a distributed file system.


### DataFrame

In Apache Spark, a DataFrame is a distributed collection of rows under named columns. It is conceptually equivalent to a table in a relational database, an Excel sheet with Column headers, or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. It also shares some common characteristics with RDD:

- **Immutable in nature** : We can create DataFrame / RDD once but can’t change it. And we can transform a DataFrame / RDD after applying transformations.
- **Lazy Evaluations**: Which means that a task is not executed until an action is performed.
- **Distributed**: RDD and DataFrame both are distributed in nature.

#### Advantages of the DataFrame

- DataFrames are designed for processing large collection of structured or semi-structured data.
- Observations in Spark DataFrame are organised under named columns, which helps Apache Spark to understand the schema of a DataFrame. This helps Spark optimize execution plan on these queries.
- DataFrame in Apache Spark has the ability to handle petabytes of data.
- DataFrame has a support for wide range of data format and sources.
- It has API support for different languages like Python, R, Scala, Java.

##### PySpark isn't on sys.path by default, but that doesn't mean it can't be used as a regular library. You can address this by either symlinking pyspark into your site-packages, or adding pyspark to sys.path at runtime. findspark does the latter.

In [2]:
import findspark
findspark.init()

In [3]:
import pyspark

In [4]:
from pyspark import SparkContext as sc
from pyspark.sql import Row

#### Create a SparkSession app object

In [5]:
from pyspark.sql import SparkSession

In [6]:
spark1 = SparkSession.builder.appName('Basics').getOrCreate()

In [7]:
# Read in a JSON file and examine
df = spark1.read.json('Data/people.json')

In [8]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [12]:
# Use printSchema() to show he schema of the data. Note, how tightly it is integrated to the SQL-like framework. 
# You can even see that the schema accepts null values because nullable property is set True.
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [14]:
# Fortunately a simple columns method exists to get column names back as a Python list
col_list=df.columns
col_list

['age', 'name']

In [15]:
# Similar to Pandas, the describe method is used for the statistical summary
# But unlike Pandas, calling only describe() returns a DataFrame!
df.describe()

DataFrame[summary: string, age: string, name: string]

In [16]:
# True to the spirit of lazy evaluation, you have to evaluate the resulting DataFrame by calling show()
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [17]:
# You can also use summary() method for more descriptive statistics including quartiles
df.summary().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    25%|                19|   null|
|    50%|                19|   null|
|    75%|                30|   null|
|    max|                30|Michael|
+-------+------------------+-------+



### How you can define your own Data Schema

In [18]:
# Import data types and structure types to build the data schema yourself
from pyspark.sql.types import StructField, IntegerType, StringType, StructType

In [19]:
# Define your data schema by supplying name and data types to the structure fields you will be importing
data_schema = [StructField('age',IntegerType(),True),
              StructField('name',StringType(),True)]

In [20]:
# Now create a StrucType with this schema as field
final_struc = StructType(fields=data_schema)

In [21]:
# Now read in the same old JSON with this new schema
df = spark1.read.json('Data/people.json',schema=final_struc)
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [22]:
# Now when you print the schema, you will see that the age is read as int and not long. 
# By default Spark could not figure out for this column the exact data type that you wanted,so it went with long. 
# But this is how you can build your own schema and instruct Spark to read the data accoridngly.
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [25]:
# how to extract a single column as a DataFrame? Use select()
dr = df.select('age')

In [26]:
dr.show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [27]:
# What is Row object?
df.head(2)

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [28]:
df.head(2)[0]

Row(age=None, name='Michael')

In [29]:
row0=df.head(2)[0]
# You can get back a normal Python dictionary from the row object
row0.asDict()
# Remember that in Pandas DataFrame we have pandas.series object as either column or row. 
# The reason Spark offers separate Column or Row object is the ability to work over a distributed file system 
# where this distinction will come handy.

{'age': None, 'name': 'Michael'}

In [30]:
# Creating new column
# You cannot think like Pandas. Following will produce error
df['newage']=2*df['age']

TypeError: 'DataFrame' object does not support item assignment

In [31]:
df.withColumn('newage', 2*df['age'])
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [32]:
# Use withColumn() method instead
df_new = df.withColumn('newage', 2*df['age'])
df_new.show()

+----+-------+------+
| age|   name|newage|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    60|
|  19| Justin|    38|
+----+-------+------+



#### Just for renaming, use withColumnRenamed() method

In [35]:
df_new.withColumnRenamed('newage', 'double_age').show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



#### You can do operation with multiple columns, like a vector sum

In [None]:
# df2=df.withColumn('half_age',df['age']/2)
df2.show()

In [39]:
df2 = df2.withColumn('new_age',df2['age']+df2['half_age'])
df2.show()

+----+-------+--------+-------+
| age|   name|half_age|new_age|
+----+-------+--------+-------+
|null|Michael|    null|   null|
|  30|   Andy|    15.0|   45.0|
|  19| Justin|     9.5|   28.5|
+----+-------+--------+-------+



In [40]:
# Now if you print schema, you will see that the data type of _halfage and _newage are automaically set to double 
# (due to floating point operation performed)
df2.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- half_age: double (nullable = true)
 |-- new_age: double (nullable = true)



#### DataFrame is immutable and there is no inplace choice like Pandas! So the original DataFrame has not changed

In [41]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



### Integration with SparkSQL - Run SQL query!

You may be wondering why this SparkSession object came out of spark.sql class. That is because it is tightly integrated with the SparkSQL and is designed to work with SQL or SQL-like queries seamlessly for data analytics.

It is good to create a temporary view of the DataFrame. Here people is the name of the SQL table view.

In [42]:
df.createOrReplaceTempView('people')

##### Now run a simple SQL query directly on this view. It returns a DataFrame.

In [44]:
result = spark1.sql("SELECT * from people")
result.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [45]:
result_over_25 = spark1.sql("SELECT * FROM people WHERE age > 25")
result_over_25.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

