In [None]:
Apache Spark is written in Scala programming language. To support Python with Spark, Apache Spark community released a tool,
PySpark. Using PySpark, you can work with RDDs in Python programming language also. It is because of a library called Py4j 
that they are able to achieve this.

In [None]:
Apache Spark is a lightning fast real-time processing framework. It does in-memory computations to analyze data in real-time.
PySpark offers PySpark Shell which links the Python API to the spark core and initializes the Spark context

In [None]:
SparkContext is the entry point to any spark functionality. When we run any Spark application, a driver program starts, 
which has the main function and your SparkContext gets initiated here. The driver program then runs the operations inside
the executors on worker nodes.

SparkContext uses Py4J to launch a JVM and creates a JavaSparkContext. By default, PySpark has SparkContext available as
‘sc’, so creating a new SparkContext won't work.

In [None]:
# Spark In-memory Computing

1. Keeping the data in-memory improves the performance by an order of magnitudes. The main abstraction of Spark is its RDDs.
   And the RDDs are cached using the cache() or persist() method.
    
2. The in-memory capability of Spark is good for machine learning and micro-batch processing. It provides faster execution 
   for iterative jobs.    

In [None]:
# Lazy evaluation

1. lazy evaluation in Spark means that the execution will not start until an action is triggered. In Spark, the picture 
   of lazy evaluation comes when Spark transformations occur
    
2. Transformations are lazy in nature meaning when we call some operation in RDD, it does not execute immediately. Spark 
   maintains the record of which operation is being called(Through DAG). We can think Spark RDD as the data, that we built 
   up through transformation. Since transformations are lazy in nature, so we can execute operation any time by calling an 
   action on data. Hence, in lazy evaluation data is not loaded until it is necessary.    

In [None]:
# Fault tolerance 

1. Since Apache Spark RDD is an immutable dataset, each Spark RDD remembers the lineage of the deterministic operation 
   that was used on fault-tolerant input dataset to create it.

2. If due to a worker node failure any partition of an RDD is lost, then that partition can be re-computed from the 
   original fault-tolerant dataset using the lineage of operations.

3. Assuming that all of the RDD transformations are deterministic, the data in the final transformed RDD will always be 
   the same irrespective of failures in the Spark cluster.    

In [None]:
### Transformations & Actions

#Narrow Transformations (data will reside on single cluster)

1. Map 2. FlatMap 3. MapPartition 4.Filter 5.Sample 6.Union

# Wide Transformations (Data will reside on different cluster)

1. Intersection 2. Distinct 3. ReduceByKey 4. GroupByKey 5. Join 6. Cartesian 7. Repartition 8. Coalesce

#Action2

1. count 2. first 3. take,takeSample 4. reduce 5. collect 6. top

In [None]:
### RDD to DataFrame

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.rdd import RDD
conf = SparkConf().setAppName("Collinear Points")
sc = SparkContext('local',conf=conf)  

In [4]:
import findspark
findspark.init()

In [2]:
dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]
rdd = sc.parallelize(dept)

In [None]:
rdd = sc.textFile("PATH/blogtexts")

In [None]:
rdd.take(5)
# Output:
[u'Think of it for a moment \u2013 1 Qunitillion = 1 Million Billion! Can you imagine how many drives / CDs / Blue-ray DVDs would be required to store them? It is difficult to imagine this scale of data generation even as a data science professional. While this pace of data generation is very exciting,  it has created entirely new set of challenges and has forced us to find new ways to handle Big Huge data effectively.',
 u'',
 u'Big Data is not a new phenomena. It has been around for a while now. However, it has become really important with this pace of data generation. In past, several systems were developed for processing big data. Most of them were based on MapReduce framework. These frameworks typically rely on use of hard disk for saving and retrieving the results. However, this turns out to be very costly in terms of time and speed.',
 u'',
 u'On the other hand, Organizations have never been more hungrier to add a competitive differentiation through understanding this data and offering its customer a much better experience. Imagine how valuable would be Facebook, if it did not understand your interests well? The traditional hard disk based MapReduce kind of frameworks do not help much to address this challenge.'
]

In [None]:
# map
def Func(lines):
    lines = lines.lower()
    lines = lines.split()
    return lines
rdd1 = rdd.map(Func)
# Output: [[u'think', u'of', u'it', u'for', u'a']]

In [None]:
# Flat Map
# The “flatMap” transformation will return a new RDD by first applying a function to all elements of this RDD, and then
# flattening the results

rdd2 = rdd.flatMap(Func)
rdd2.take(5)

# Output: [u'think', u'of', u'it', u'for', u'a']

In [None]:
# Transformation: filter
stopwords = ['is','am','are','the','for','a']
rdd3 = rdd2.filter(lambda x: x not in stopwords)
rdd3.take(10)
Output:[u'think',u'of',u'it',u'moment',u'\u2013',u'1',u'qunitillion',u'=',u'1',u'million']

In [None]:
# Transformation: groupBy
rdd4 = rdd3.groupBy(lambda w: w[0:3])
print [(k, list(v)) for (k, v) in rdd4.take(1)]
Output: [(u'all', [u'all', u'allocates', u'all', u'all', u'allows', u'all', u'all', u'all', u'all', u'all', u'all', u'all'])]

In [None]:
# Transformation: groupByKey / reduceByKey 
rdd3_mapped = rdd3.map(lambda x: (x,1))
rdd3_grouped = rdd3_mapped.groupByKey()
print(list((j[0], list(j[1])) for j in rdd3_grouped.take(5)))
Output: [(u'all', [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), (u'elements,', [1, 1]), (u'step2:', [1]), (u'manager', [1]), (u'(if', [1])]

In [None]:
rdd3_freq_of_words = rdd3_grouped.mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(False)
rdd3_freq_of_words.take(10)
output:
[(164, u'to'),
 (143, u'in'),
 (122, u'of'),
 (106, u'and'),
 (103, u'we'),
 (69, u'spark'),
 (64, u'this'),
 (63, u'data'),
 (55, u'can'),
 (52, u'apache')]

In [None]:
rdd3_mapped.reduceByKey(lambda x,y: x+y).map(lambda x:(x[1],x[0])).sortByKey(False).take(10)
output:
[(164, u'to'),
 (143, u'in'),
 (122, u'of'),
 (106, u'and'),
 (103, u'we'),
 (69, u'spark'),
 (64, u'this'),
 (63, u'data'),
 (55, u'can'),
 (52, u'apache')]

In [59]:
data = ["Project",
"Gutenberg’s",
"Alice’s",
"Adventures",
"in",
"Wonderland",
"Project",
"Gutenberg’s",
"Adventures",
"in",
"Wonderland",
"Project",
"Gutenberg’s"]

In [40]:
rdd=spark.sparkContext.parallelize(data)

In [60]:
rdd2=rdd.map(lambda x: (x,1))
for element in rdd2.collect():
    print(element)

('Project', 1)
('Gutenberg’s', 1)
('Alice’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)


In [42]:
data1 = [('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), ]

In [61]:
columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data1, schema = columns)
df.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|    30|
|     Anna|    Rose|     F|    41|
|   Robert|Williams|     M|    62|
+---------+--------+------+------+



In [62]:
rdd2=df.rdd.map(lambda x:(x[0]+","+x[1],x[2],x[3]*2))  
df2=rdd2.toDF(["name","gender","new_salary"]   )
df2.show()

+---------------+------+----------+
|           name|gender|new_salary|
+---------------+------+----------+
|    James,Smith|     M|        60|
|      Anna,Rose|     F|        82|
|Robert,Williams|     M|       124|
+---------------+------+----------+



In [45]:
from pyspark.sql.functions import concat_ws,col,lit

In [63]:
df.select(concat_ws(",",df.firstname,df.lastname).alias("name"), \
          df.gender,lit(df.salary*2).alias("new_salary")).show()

+---------------+------+----------+
|           name|gender|new_salary|
+---------------+------+----------+
|    James,Smith|     M|        60|
|      Anna,Rose|     F|        82|
|Robert,Williams|     M|       124|
+---------------+------+----------+



In [47]:
import pandas as pd

In [48]:
pandasDF = df.toPandas()
for index, row in pandasDF.iterrows():
    print(row['firstname'], row['gender'])

James M
Anna F
Robert M


In [1]:
import os
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.streaming import StreamingContext
from pyspark.sql.types import *

In [2]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()
print(spark.sparkContext)
print("Spark App Name : "+ spark.sparkContext.appName)

<SparkContext master=local[1] appName=SparkByExamples.com>
Spark App Name : SparkByExamples.com


In [None]:
# Importing Data

csv_df = spark.read.csv("......./my_buckets/data.csv")

# Reading Header
csv_df = spark.read.csv("....../my_buckets/poland_ks", header = "true")

# Print schema
csv_df.printSchema()

# Infer schema while reading data
csv_df = spark.read.csv("....../my_buckets/data.csv", header =True, inferSchema=True)

In [None]:
# Reading Jason
jason_df = spark.read.json("....../data.json")

# Reading Parquet
parquet_df = spark.read.parquet("....../data.parquet")

In [None]:
# Define schema

from pyspark.sql.types import *

custom_schema = StructType([
    StructField("CLIENTNUM", StringType()),
    StructField("name", StringType()),
    StructField("superior_emp_id", StringType()),
    StructField("year_joined", DateType()),
    StructField("emp_dept_id", StringType()),
    StructField("gender", StringType()),
    StructField("salary", IntegerType())])

In [10]:
spark = SparkSession.builder.appName('churn-cc').getOrCreate()
df = spark.read.csv('credit_card_churn.csv', header = True, schema = custom_schema,inferSchema=True)

In [15]:
df1.show(truncate=False)

+---------+-----------------+------------+------+---------------+---------------+--------------+---------------+-------------+--------------+------------------------+----------------------+---------------------+------------+-------------------+---------------+--------------------+---------------+--------------+-------------------+---------------------+----------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------+
|CLIENTNUM|Attrition_Flag   |Customer_Age|Gender|Dependent_count|Education_Level|Marital_Status|Income_Category|Card_Category|Months_on_book|Total_Relationship_Count|Months_Inactive_12_mon|Contacts_Count_12_mon|Credit_Limit|Total_Revolving_Bal|Avg_Open_To_Buy|Total_Amt_Chng_Q4_Q1|Total_Trans_Amt|Total_Trans_Ct|Total_Ct_Chng_Q4_Q1|Avg_Utilization_Ratio|Naive_Bayes_Classifier_Attrit

In [51]:
df1.columns

['CLIENTNUM',
 'Attrition_Flag',
 'Customer_Age',
 'Gender',
 'Dependent_count',
 'Education_Level',
 'Marital_Status',
 'Income_Category',
 'Card_Category',
 'Months_on_book',
 'Total_Relationship_Count',
 'Months_Inactive_12_mon',
 'Contacts_Count_12_mon',
 'Credit_Limit',
 'Total_Revolving_Bal',
 'Avg_Open_To_Buy',
 'Total_Amt_Chng_Q4_Q1',
 'Total_Trans_Amt',
 'Total_Trans_Ct',
 'Total_Ct_Chng_Q4_Q1',
 'Avg_Utilization_Ratio',
 'Naive_Bayes_Classifier_Attrition_Flag_Card_Category_Contacts_Count_12_mon_Dependent_count_Education_Level_Months_Inactive_12_mon_1',
 'Naive_Bayes_Classifier_Attrition_Flag_Card_Category_Contacts_Count_12_mon_Dependent_count_Education_Level_Months_Inactive_12_mon_2']

In [52]:
df2 = df1.where(col("Education_Level") =="Graduate")

In [58]:
df2.count()

3128

In [None]:
### Persist

1. Using persist() method, PySpark provides an optimization mechanism to store the intermediate computation of a PySpark 
DataFrame so they can be reused in subsequent actions.

2. When we persist a dataset, each node stores its partitioned data in memory and reuses them in other actions on that 
dataset. And PySpark persisted data on nodes are fault-tolerant meaning if any partition of a Dataset is lost, it will 
automatically be recomputed using the original transformations that created it.

In [14]:
# Persist the DataFrame
df1 = df.persist()

In [9]:
# Exporting csv 
# By using coalesce(1) or repartition(1) all the partitions of the dataframe are combined in a single block.

partitioned_output.coalesce(1).write.mode("overwrite")\
.format("com.databricks.spark.csv")\
.option("header", "true")\
.option("sep", "|")\
.save('......../my_output_csv')

In [None]:
# selecting column from df
df1 = df.select('c1','c2','c3','c4')
df1.show()

# Renaming column
df1 = df.withColumnRenamed("c1","renamed_c1")

In [None]:
# Changing the schema
from pyspark.sql.types import DoubleType, IntegerType, StringType
cases = cases.withColumn('c1', F.col('c1').cast(IntegerType()))
cases = cases.withColumn('c2', F.col('c2').cast(StringType()))

In [None]:
# FILTER
df.filter((df.c1>10) & (df.c2=='AAB')).show()

In [None]:
# Sorting
df.sort("c1").show()

# descending Sort
from pyspark.sql import functions as F
df.sort(F.desc("c1")).show()

In [None]:
# GroupBy
from pyspark.sql import functions as F
df1 = df.groupBy(["c1","c2"]).agg(F.sum("c3"),F.max("c4"))

# Use alias to rename columns
df1 = df.groupBy(["c1","c2"]).agg(
    F.sum("c3").alias("c3_renamed"),\
    F.max("c4").alias("c4_renamed")\)

In [None]:
## Dropping duplicates
df1 = df.dropDuplicates(["department","salary"])
df.withColumn('duplicated', F.count('*').over(W.partitionBy('ncf', 'date').orderBy(F.lit(1))) > 1)

In [None]:
### deciles
from pyspark.sql.window import Window
import pyspark.sql.functions as F

df1 = df1.select("Item_group","Item_name","Price", F.ntile(10).over(Window.partitionBy().orderBy(df1['price'])).alias("decile_rank"))
df1.show()

In [None]:
# Joins1
df3 = df1.join(df2, ['province','city'],how='left')
df3.limit(10)

# Joins2
df1.join(df2,[col(f) == col(s) for (f, s) in zip(df1,df2)], "inner")

In [None]:
df1.join(df2,(df1["col1"]==df2["col2"]) & (df1["col3"]==df2["col4"]),how = "left")

In [36]:
# Define schema

from pyspark.sql.types import *

custom_schema = StructType([
    StructField("emp_id", StringType()),
    StructField("name", StringType()),
    StructField("superior_emp_id", StringType()),
    StructField("year_joined", DateType()),
    StructField("emp_dept_id", StringType()),
    StructField("gender", StringType()),
    StructField("salary", IntegerType())])

In [38]:
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+-----

In [18]:
dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



In [64]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner") \
     .show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [20]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter") \
    .show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [None]:
# Broadcast
#Sometimes, we might face a scenario in which we need to join a very big table (~1B rows) with a very 
#small table (~100–200 rows).

#PySpark Broadcast Join is an important part of the SQL execution engine, With broadcast join, PySpark broadcast the smaller
#DataFrame to all executors and the executor keeps this DataFrame in memory and the larger DataFrame is split and 
#distributed across all executors so that PySpark can perform a join without shuffling any data from the larger DataFrame as
#the data required for join colocated on every executor.

from pyspark.sql.functions import broadcast
cases = big_df.join(broadcast(small_df), ['province','city'],how='left')

In [None]:
df2 = largeDF.join(broadcast(smallerDF),smallerDF("code")  largeDF("UNIT"))

In [None]:
# Use SQL With Data Frames
df.registerTempTable('df_table')
new_df = sqlContext.sql('select * from df_table where c1 > 100')
new_df.show()

In [50]:
# Spark Window Functions
from pyspark.sql.window import Window
import pyspark.sql.functions as F
windowSpec = Window().partitionBy(['emp_dept_id']).orderBy(F.desc('salary'))
empDF.withColumn("rank",F.rank().over(windowSpec)).filter(col("rank") <= 2).show()

+------+-----+---------------+-----------+-----------+------+------+----+
|emp_id| name|superior_emp_id|year_joined|emp_dept_id|gender|salary|rank|
+------+-----+---------------+-----------+-----------+------+------+----+
|     1|Smith|             -1|       2018|         10|     M|  3000|   1|
|     4|Jones|              2|       2005|         10|     F|  2000|   2|
|     2| Rose|              1|       2010|         20|     M|  4000|   1|
|     5|Brown|              2|       2010|         40|      |    -1|   1|
|     6|Brown|              2|       2010|         50|      |    -1|   1|
+------+-----+---------------+-----------+-----------+------+------+----+



In [25]:
# Create new column
import pyspark.sql.functions as F
empDF1 = empDF.withColumn("bonus", 20 * F.col("salary"))
empDF1.show()

# exponential
new_df = df.withColumn("exp_c1", F.exp("c1"))
new_df.show()

+------+--------+---------------+-----------+-----------+------+------+-----+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|bonus|
+------+--------+---------------+-----------+-----------+------+------+-----+
|     1|   Smith|             -1|       2018|         10|     M|  3000|60000|
|     2|    Rose|              1|       2010|         20|     M|  4000|80000|
|     3|Williams|              1|       2010|         10|     M|  1000|20000|
|     4|   Jones|              2|       2005|         10|     F|  2000|40000|
|     5|   Brown|              2|       2010|         40|      |    -1|  -20|
|     6|   Brown|              2|       2010|         50|      |    -1|  -20|
+------+--------+---------------+-----------+-----------+------+------+-----+



In [33]:
from pyspark.sql.functions import col,when
empDF2=empDF1.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in empDF1.columns])
empDF2.show()


+------+--------+---------------+-----------+-----------+------+------+-----+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|bonus|
+------+--------+---------------+-----------+-----------+------+------+-----+
|     1|   Smith|             -1|       2018|         10|     M|  3000|60000|
|     2|    Rose|              1|       2010|         20|     M|  4000|80000|
|     3|Williams|              1|       2010|         10|     M|  1000|20000|
|     4|   Jones|              2|       2005|         10|     F|  2000|40000|
|     5|   Brown|              2|       2010|         40|  null|    -1|  -20|
|     6|   Brown|              2|       2010|         50|  null|    -1|  -20|
+------+--------+---------------+-----------+-----------+------+------+-----+



In [None]:
# UDF
import pyspark.sql.functions as F
from pyspark.sql.types import *
def HighLow(c1):
    if c1 < 50: 
        return 'low'
    else:
        return 'high'
    
#convert to a UDF Function by passing in the function and return type of function
HighLowUDF = F.udf(HighLow, StringType())
df1 = df.withColumn("HighLow", HighLowUDF("c1"))
df1.show()