In [2]:
!pip3 install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f2/64/a1df4440483df47381bbbf6a03119ef66515cf2e1a766d9369811575454b/pyspark-2.4.1.tar.gz (215.7MB)
[K    100% |████████████████████████████████| 215.7MB 115kB/s 
[?25hCollecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K    100% |████████████████████████████████| 204kB 26.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Stored in directory: /root/.cache/pip/wheels/47/9b/57/7984bf19763749a13eece44c3174adb6ae4bc95b920375ff50
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.1


In [0]:
from pyspark.sql import SparkSession

spark=SparkSession\
.builder\
.appName("basic")\
.config("spark.some.config.option","some-value")\
.getOrCreate()

In [4]:
df=spark.read.format("com.databricks.spark.csv").\
options(header="true",inferschema="true").\
load("sample_data/california_housing_train.csv",header=True)

df.show(5)
df.printSchema()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
+---------+--------+----

# jargon of Apache Spark


*  ** Job:** A piece of code which reads some input from HDFS or local, performs some computation on the
data and writes some output data.


*   **Stages:** Jobs are divided into stages. Stages are classified as a Map or reduce stages (Its easier to
understand if you have worked on Hadoop and want to correlate). Stages are divided based on computational
boundaries, all computations (operators) cannot be Updated in a single Stage. It happens
over many stages.


*   **Tasks:** Each stage has some tasks, one task per partition. One task is executed on one partition of data
on one executor (machine).

*  ** DAG:** DAG stands for Directed Acyclic Graph, in the present context its a DAG of operators.
*   **Executor:** The process responsible for executing a task.


*   **Master:** The machine on which the Driver program runs.

*   **Slave:** The machine on which the Executor program runs.





---
# Spark Components



> 1.** Spark Driver**


*   separate process to execute user applications
*  creates SparkContext to schedule jobs execution and negotiate with cluster manager



  > 2.**Cluster Manager**



*   Mesos
*   YARN
*   StandAlone



> 3.**Executors**


*   run tasks scheduled by driver
*   store computation results in memory, on disk or off-heap
* interact with storage systems

![alt text](http://web.utk.edu/~wfeng1/spark/_images/spark-components.png)





















Spark Driver contains more components responsible for translation of user code into actual jobs executed
on cluster:


*  ** SparkContext–** represents the connection to a Spark cluster, and can be used to create RDDs, accumulators
and broadcast variables on that cluster
*  **DAGScheduler– **computes a DAG of stages for each job and submits them to TaskScheduler determines
preferred locations for tasks (based on cache status or shuffle files locations)
and finds minimum schedule to run the jobs


*   **TaskScheduler–** responsible for sending tasks to the cluster, running them, retrying if there are failures,
and mitigating stragglers
*   **SchedulerBackend**--backend interface for scheduling systems that allows plugging in different implementations(
Mesos, YARN, Standalone, local)


* ** BlockManager**--provides interfaces for putting and retrieving blocks both locally and remotely into
various stores (memory, disk, and off-heap)


Spark has a small code base and the system is divided in various layers. Each layer has some responsibilities.
The layers are independent of each other.
The first layer is the interpreter, Spark uses a Scala interpreter, with some modifications. As you enter
your code in spark console (creating RDD’s and applying operators), Spark creates a operator graph. When
the user runs an action (like collect), the Graph is submitted to a DAG Scheduler. The DAG scheduler
divides operator graph into (map and reduce) stages. A stage is comprised of tasks based on partitions of
the input data. The DAG scheduler pipelines operators together to optimize the graph. For e.g. Many map
operators can be scheduled in a single stage. This optimization is key to Sparks performance. The final
result of a DAG scheduler is a set of stages. The stages are passed on to the Task Scheduler. The task
scheduler launches tasks via cluster manager. (Spark Standalone/Yarn/Mesos). The task scheduler doesn’t
know about dependencies among stages.









# PROGRAMMING WITH RDDS

An RDD in Spark is simply an immutable distributed
collection of objects sets. Each RDD is split into multiple partitions (similar pattern with smaller sets),
which may be computed on different nodes of the cluster.

# Create RDD

there are two popular way to create the RDDs:

*   loading an external dataset.

*   distributing a set
of collection of objects





In [5]:
dataframe=spark.sparkContext.parallelize([(1,2,3,"a b c"),(4,5,6,"d e f"),(7,8,9,"g h i")]).toDF(['col1', 'col2', 'col3','col4'])
dataframe.show()

+----+----+----+-----+
|col1|col2|col3| col4|
+----+----+----+-----+
|   1|   2|   3|a b c|
|   4|   5|   6|d e f|
|   7|   8|   9|g h i|
+----+----+----+-----+



In [6]:
dataframe1=spark.sparkContext.parallelize([(1,2,3,"a b c"),(4,5,6,"d e f"),(7,8,9,"g h i")])
dataframe1.collect()

[(1, 2, 3, 'a b c'), (4, 5, 6, 'd e f'), (7, 8, 9, 'g h i')]

In [7]:
dataframe2=spark.createDataFrame([("1","2","3","a b c"),("4","5","6","d e f"),("7","8","9","g h i")],['col1', 'col2', 'col3','col4'])
dataframe2.show()

+----+----+----+-----+
|col1|col2|col3| col4|
+----+----+----+-----+
|   1|   2|   3|a b c|
|   4|   5|   6|d e f|
|   7|   8|   9|g h i|
+----+----+----+-----+



# Spark Operations



1.   **Transformations**--Transformations construct a new RDD from a previous one. For example, one common transformation is
filtering data that matches a predicate.
2.   **Actions**--Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or
save it to an external storage system (e.g., HDFS).

# rdd.DataFrame vs pd.DataFrame



In [0]:
my_list = [['a', 1, 2], ['b', 2, 3],['c', 3, 4]]
col_name = ['A', 'B', 'C']

In [9]:
import pandas as pd
# caution for the columns=
pd.DataFrame(my_list,columns= col_name)

Unnamed: 0,A,B,C
0,a,1,2
1,b,2,3
2,c,3,4


Pay attentation to the parameter **columns= in pd.DataFrame**. Since the default value
will make the list as rows.

In [10]:
pd.DataFrame(my_list, col_name)

Unnamed: 0,0,1,2
A,a,1,2
B,b,2,3
C,c,3,4


In [11]:
spark.createDataFrame(my_list, col_name).show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  1|  2|
|  b|  2|  3|
|  c|  3|  4|
+---+---+---+



# From Dict

In [0]:
d = {'A': [0, 1, 0],
'B': [1, 0, 1],
'C': [1, 0, 0]}

In [13]:
pd.DataFrame(d)

Unnamed: 0,A,B,C
0,0,1,1
1,1,0,0
2,0,1,0


In [14]:
import numpy as np
spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  0|  1|  1|
|  1|  0|  0|
|  0|  1|  0|
+---+---+---+



# Reading Files From CSV Files

In [15]:
dps = pd.read_csv('sample_data/california_housing_train.csv')
dps.head(3)

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
0,-114.31,34.19,15.0,5612.0,1283.0,1015.0,472.0,1.4936,66900.0
1,-114.47,34.4,19.0,7650.0,1901.0,1129.0,463.0,1.82,80100.0
2,-114.56,33.69,17.0,720.0,174.0,333.0,117.0,1.6509,85700.0


In [16]:
dss = spark.read.csv(path='sample_data/california_housing_train.csv',sep=",",encoding="UTF-8",comment=None,header=True,inferSchema=True)
dss.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 3 rows



In [17]:
dps.columns

Index(['longitude', 'latitude', 'housing_median_age', 'total_rooms',
       'total_bedrooms', 'population', 'households', 'median_income',
       'median_house_value'],
      dtype='object')

In [18]:
  dss.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

In [19]:
dps.dtypes

longitude             float64
latitude              float64
housing_median_age    float64
total_rooms           float64
total_bedrooms        float64
population            float64
households            float64
median_income         float64
median_house_value    float64
dtype: object

In [20]:
dss.dtypes

[('longitude', 'double'),
 ('latitude', 'double'),
 ('housing_median_age', 'double'),
 ('total_rooms', 'double'),
 ('total_bedrooms', 'double'),
 ('population', 'double'),
 ('households', 'double'),
 ('median_income', 'double'),
 ('median_house_value', 'double')]

# Fill Null

In [21]:
my_list = [['a', 1, None], ['b', 2, 3],['c', 3, 4]]
dp = pd.DataFrame(my_list,columns=['A', 'B', 'C'])
ds = spark.createDataFrame(my_list, ['A', 'B', 'C'])
#
print(dp.head())
print("*"*20)
ds.show()

   A  B    C
0  a  1  NaN
1  b  2  3.0
2  c  3  4.0
********************
+---+---+----+
|  A|  B|   C|
+---+---+----+
|  a|  1|null|
|  b|  2|   3|
|  c|  3|   4|
+---+---+----+



In [22]:
print(dp.fillna(-99))
print("*"*20)
ds.fillna(-99).show()

   A  B     C
0  a  1 -99.0
1  b  2   3.0
2  c  3   4.0
********************
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  1|-99|
|  b|  2|  3|
|  c|  3|  4|
+---+---+---+



In [23]:
mapping = {'C':'Newspaper'}
print(dp.rename(columns=mapping).head(4))
print("*"*20)
new_names = [mapping.get(col,col) for col in ds.columns]
ds.toDF(*new_names).show(4)

   A  B  Newspaper
0  a  1        NaN
1  b  2        3.0
2  c  3        4.0
********************
+---+---+---------+
|  A|  B|Newspaper|
+---+---+---------+
|  a|  1|     null|
|  b|  2|        3|
|  c|  3|        4|
+---+---+---------+



In [24]:
ds.withColumnRenamed('C','Paper').show()

+---+---+-----+
|  A|  B|Paper|
+---+---+-----+
|  a|  1| null|
|  b|  2|    3|
|  c|  3|    4|
+---+---+-----+



In [25]:
drop_name=["A"]
print(dp.drop(drop_name,axis=1).head())
print("*"*20)
ds.drop(*drop_name).show()


   B    C
0  1  NaN
1  2  3.0
2  3  4.0
********************
+---+----+
|  B|   C|
+---+----+
|  1|null|
|  2|   3|
|  3|   4|
+---+----+



# Filtering

In [26]:
print(dps[dps.housing_median_age>18].head(3))
print("-"*150)
print(dss[dss.housing_median_age>18].show(3))

   longitude  latitude  housing_median_age  total_rooms  total_bedrooms  \
1    -114.47     34.40                19.0       7650.0          1901.0   
4    -114.57     33.57                20.0       1454.0           326.0   
5    -114.58     33.63                29.0       1387.0           236.0   

   population  households  median_income  median_house_value  
1      1129.0       463.0         1.8200             80100.0  
4       624.0       262.0         1.9250             65500.0  
5       671.0       239.0         3.3438             74000.0  
------------------------------------------------------------------------------------------------------------------------------------------------------
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+-

In [39]:
from pyspark.sql import functions as F
dps['housing_median_age_norm'] = dps.housing_median_age/sum(dps.housing_median_age)
print(dps.head(4))
#
dss.withColumn('housing_median_age_norm', dss.housing_median_age/dss.groupBy().agg(F.sum("housing_median_age")).collect()[0][0]).show(4)

   longitude  latitude  housing_median_age  total_rooms  total_bedrooms  \
0    -114.31     34.19                15.0       5612.0          1283.0   
1    -114.47     34.40                19.0       7650.0          1901.0   
2    -114.56     33.69                17.0        720.0           174.0   
3    -114.57     33.64                14.0       1501.0           337.0   

   population  households  median_income  median_house_value  \
0      1015.0       472.0         1.4936             66900.0   
1      1129.0       463.0         1.8200             80100.0   
2       333.0       117.0         1.6509             85700.0   
3       515.0       226.0         3.1917             73400.0   

   housing_median_age_norm  
0                 0.000031  
1                 0.000039  
2                 0.000035  
3                 0.000029  
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----------------------+
|longitude|l

# Statistical Tests
# Correlational Test

**Pearson correlation:** Tests for the strength of the association between two **continuous** variables.

**Chi-square:** Tests for the strength of the association between two **categorical **variables.

**Spearman correlation:** Tests for the strength of the association between two** ordinal** variables (does
not rely on the assumption of normal distributed data).

# Comparison of Means test


**Paired T-test:** Tests for difference between two related variables.

**Independent T-test:** Tests for difference between two independent variables.





In [0]:
def describe_pd(df_in, columns, deciles=False):
  '''
  Function to union the basic stats results and deciles
  :param df_in: the input dataframe
  :param columns: the cloumn name list of the numerical variable
  :param deciles: the deciles output
  :return : the numerical describe info. of the input dataframe
  :author: sayajit maitra
  :email: copyright@gmail.com
  '''
  if deciles:
    percentiles = np.array(range(0, 110, 10))
  else:
    percentiles = [25, 50, 75]
  percs = np.transpose([np.percentile(df_in.select(x).collect(),percentiles) for x in columns])
  percs = pd.DataFrame(percs, columns=columns)
  percs['summary'] = [str(p) + '%' for p in percentiles]
  spark_describe = df_in.describe().toPandas()
  new_df = pd.concat([spark_describe, percs],ignore_index=True)
  new_df = new_df.round(2)
  return new_df[['summary'] + columns]

In [31]:
describe_pd(dss,['housing_median_age'])

Unnamed: 0,summary,housing_median_age
0,count,17000.0
1,mean,28.58935294117647
2,stddev,12.586936981660406
3,min,1.0
4,max,52.0
5,25%,18.0
6,50%,29.0
7,75%,37.0


In [32]:
describe_pd(dss,['housing_median_age'],True)

Unnamed: 0,summary,housing_median_age
0,count,17000.0
1,mean,28.58935294117647
2,stddev,12.586936981660406
3,min,1.0
4,max,52.0
5,0%,1.0
6,10%,12.0
7,20%,17.0
8,30%,20.0
9,40%,25.0


In [33]:
dss.select(['housing_median_age']).describe().show()#default continous description

+-------+------------------+
|summary|housing_median_age|
+-------+------------------+
|  count|             17000|
|   mean| 28.58935294117647|
| stddev|12.586936981660406|
|    min|               1.0|
|    max|              52.0|
+-------+------------------+



In [0]:
import matplotlib.pyplot as plt


bins=np.arange(0.0,60.0,5.0)

plt.figure(figsize=(10,8))
plt.hist(dss.,bins,alpha=0.8,histtype="barstacked")


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import sum,col,rank
from pyspark.sql import Window

In [0]:
window=Window.rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
tab=dss

In [43]:
dss.dtypes

[('longitude', 'double'),
 ('latitude', 'double'),
 ('housing_median_age', 'double'),
 ('total_rooms', 'double'),
 ('total_bedrooms', 'double'),
 ('population', 'double'),
 ('households', 'double'),
 ('median_income', 'double'),
 ('median_house_value', 'double')]