# PySpark Learning

In [2]:
import pyspark
from pyspark.sql import SparkSession


In [3]:
from pyspark.sql.functions import count

# Create a Spark Context with all available executor.

In [4]:
sc = pyspark.SparkContext('local[*]')

# Create a spark UI.

In [5]:
spark = SparkSession.builder.appName("Python Spark").getOrCreate()

In [6]:
spark

# Create an RDD
 - RDD - resilient distributed dataset.
 - RDD's are lazy in nature because rdd transformation only execute when action is called.
 - RDD's are immutable. hence its fault tolerance.
 - RDD's are low latency API.
 - RDD's are used when LL API and data is not structured also transformations and actions are run in parralel.

In [8]:
data = range(1,30)

xrangeRDD = sc.parallelize(data)

type(xrangeRDD)

pyspark.rdd.PipelinedRDD

In [9]:
subRDD = xrangeRDD.map(lambda x: x-1)
filteredRDD = subRDD.filter(lambda x : x<10)

In [10]:
filteredRDD.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [8]:
sales = "kickstarter_projects.csv"

sales_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(sales)

In [12]:
count_sales_df = (sales_df.select("country", "state", "ID")
                  .groupBy("country", "state").agg(count("ID").alias("Total"))
                  .orderBy("Total", ascending=False))

# Show 1st 3 records

In [13]:
count_sales_df.show(3)

+-------+----------+------+
|country|     state| Total|
+-------+----------+------+
|     US|    failed|151458|
|     US|successful|108815|
|     US|  canceled| 28232|
+-------+----------+------+
only showing top 3 rows



In [9]:
word = "Spark makes life lot easier and put in good spirit, Spark is too Awesome".split(" ")

In [12]:
word_rdd =  spark.sparkContext.parallelize(word)

In [14]:
word_list = word_rdd.collect()

In [15]:
for i in word_list:
    print(i)

Spark
makes
life
lot
easier
and
put
in
good
spirit,
Spark
is
too
Awesome


# Count the number of records

In [16]:
word_rdd.count()

14

# Count the distinct records

In [17]:
word_rdd.distinct().count()

13

In [18]:
word_unique = word_rdd.distinct()
word_unique.collect()

['good',
 'makes',
 'life',
 'lot',
 'spirit,',
 'Awesome',
 'Spark',
 'easier',
 'too',
 'put',
 'in',
 'and',
 'is']

# filter is used to filter the RDD.
     - collect the records only if its true.

In [19]:
word_unique.filter(lambda text: text.startswith("i")).collect()

['in', 'is']

In [22]:
word_rdd.map(lambda x:x.upper()).collect()

['SPARK',
 'MAKES',
 'LIFE',
 'LOT',
 'EASIER',
 'AND',
 'PUT',
 'IN',
 'GOOD',
 'SPIRIT,',
 'SPARK',
 'IS',
 'TOO',
 'AWESOME']

In [1]:
word_rdd.flatMap(lambda x: x).collect()

NameError: name 'word_rdd' is not defined

In [26]:
country_name = [("India", 1), ("USA", "2")]

county_rdd = spark.sparkContext.parallelize(country_name)

In [27]:
sort_county = county_rdd.sortByKey().collect()

In [28]:
sort_county

[('India', 1), ('USA', '2')]

In [31]:
coutry_s = county_rdd.map(lambda x: (int(x[1]), x[0])).sortByKey().collect()

coutry_s

[(1, 'India'), (2, 'USA')]

In [47]:
file = r"C:\Users\rknav\OneDrive\Documents\Data set\employees_data.csv"

emp_data = spark.read.csv(file, header=True)

In [53]:
emp_data.filter(emp_data.SALARY > 10000).show()

+-----------+----------+---------+-----+------------+---------+------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|EMAIL|PHONE_NUMBER|HIRE_DATE|JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+-----+------------+---------+------+------+--------------+----------+-------------+
+-----------+----------+---------+-----+------------+---------+------+------+--------------+----------+-------------+



In [None]:
superstore_data = spark.read.format("xlsx").option("header", "true")\
                  .option("inferSchema", "true")\
                  .option("sheetName", "Superstore dataset")\
                  .load(r"C:\Users\rknav\OneDrive\Documents\Data set\Sales-Superstore-Dataset.xlsx")

In [9]:
emp_data.write.format("parquet").mode("overwrite").save()

In [8]:
spark.stop()