In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Test RDD").getOrCreate()
# IN ABOVE COMMAND WE HAVE CREATED AN APPLICATION NAMED TEST RDD..IF THIS APPLICATION NAME EXISTS IT WILL GET IT OR IT WILL CREATE A ANEW APPLICATION.


In [2]:
type(spark) 

pyspark.sql.session.SparkSession

### Creating Rdd

In [3]:
# Using Parallelize method---- data which is acquired from runtime application but not stored anywhere; so u store in rdd using parallized method

# it takes two parameters: Data and no of splits
rdd_par = spark.sparkContext.parallelize(["hi","hello","How u doing","bakwas"])
print(type(rdd_par))
print(rdd_par.collect())         #collect() is an example for action.....it is used to see the content of the file
print(rdd_par.count())           #count() gives no of records..count()[0] will give first record

<class 'pyspark.rdd.RDD'>
['hi', 'hello', 'How u doing', 'bakwas']
4


In [4]:
# Creating rdd using transformations
# filter() is an api under rdd. it takes how to filter

rdd_trans = rdd_par.filter(lambda word:word.startswith('H'))
rdd_trans.collect()

['How u doing']

In [5]:
# Creating rdd using datasource

rdd_ds = spark.sparkContext.textFile('shreyapractice.txt')
print(rdd_ds.count())
print(rdd_ds.collect())

4
['The Job output data between each step has to be stored in the distributed file system before the next step can begin.', 'Hence, this approach tends to be slow due to replication & disk storage.', 'Also, Hadoop solutions typically include clusters that are hard to set up and manage.', 'It also requires the integration of several tools for different big data use cases.']


In [6]:
#The map operation takes a Function, which is called for each value in the input stream and produces one result value, which is sent to the output stream.

#The flatMap operation takes a function that conceptually wants to consume one value and produce an arbitrary number of values.




# To count total no of words in the given text file.
rdd_ds.flatMap(lambda word:word.split(' ')).collect()      # it give all the words which are present in record
rdd_ds.flatMap(lambda word:word.split(' ')).count()        # count of total number of words in the record


63

In [7]:
# To find out how many words appear each time in a record
# 1. find list of individual words using split()
# 2. for each word create a tuple
word_rdd = rdd_ds.flatMap(lambda word:word.split(' '))

freq_words = word_rdd.map(lambda word: (word, 1))
#freq_words.collect()

# In spark we have reduce by key function. input required for reduce by key function is key value pair/ tuple.
# in reduce by key function it has builtin accumulator . initially 0..keeps adding to give the count for each word in reord

freq_words.reduceByKey(lambda a,b : a+b).collect()



[('The', 1),
 ('Job', 1),
 ('output', 1),
 ('step', 2),
 ('stored', 1),
 ('in', 1),
 ('before', 1),
 ('begin.', 1),
 ('this', 1),
 ('approach', 1),
 ('tends', 1),
 ('due', 1),
 ('disk', 1),
 ('typically', 1),
 ('include', 1),
 ('clusters', 1),
 ('are', 1),
 ('set', 1),
 ('manage.', 1),
 ('It', 1),
 ('integration', 1),
 ('of', 1),
 ('several', 1),
 ('tools', 1),
 ('different', 1),
 ('use', 1),
 ('data', 2),
 ('between', 1),
 ('each', 1),
 ('has', 1),
 ('to', 4),
 ('be', 2),
 ('the', 3),
 ('distributed', 1),
 ('file', 1),
 ('system', 1),
 ('next', 1),
 ('can', 1),
 ('Hence,', 1),
 ('slow', 1),
 ('replication', 1),
 ('&', 1),
 ('storage.', 1),
 ('Also,', 1),
 ('Hadoop', 1),
 ('solutions', 1),
 ('that', 1),
 ('hard', 1),
 ('up', 1),
 ('and', 1),
 ('also', 1),
 ('requires', 1),
 ('for', 1),
 ('big', 1),
 ('cases.', 1)]

## Creating Dataframe

In [8]:
columns = ['currency', 'value']
inputdata = [('Euro', 90), ('Pound', 100), ('Yuan', 11), ('Yen', 2), ('Us Dollar', 74), ('k dinar', 242)]

# Creating Dataframe using RDD
rdd = spark.sparkContext.parallelize(inputdata)   # Creating rdd
rddDF = rdd.toDF()                                # To create a df
rddDF.show()

+---------+---+
|       _1| _2|
+---------+---+
|     Euro| 90|
|    Pound|100|
|     Yuan| 11|
|      Yen|  2|
|Us Dollar| 74|
|  k dinar|242|
+---------+---+



In [9]:
# to give column names

df = rddDF.withColumnRenamed('_1', "Currency")
#df = rddDF.withColumnRenamed('_2', "Value")

df.show()


+---------+---+
| Currency| _2|
+---------+---+
|     Euro| 90|
|    Pound|100|
|     Yuan| 11|
|      Yen|  2|
|Us Dollar| 74|
|  k dinar|242|
+---------+---+



In [10]:
# Instead of renaming the column names everytime, we will use column names

df = spark.createDataFrame(rdd).toDF(*columns)
df.show()

+---------+-----+
| currency|value|
+---------+-----+
|     Euro|   90|
|    Pound|  100|
|     Yuan|   11|
|      Yen|    2|
|Us Dollar|   74|
|  k dinar|  242|
+---------+-----+



In [11]:
# Creating df without rdd
df = spark.createDataFrame(data = inputdata, schema = columns)
df.show()

+---------+-----+
| currency|value|
+---------+-----+
|     Euro|   90|
|    Pound|  100|
|     Yuan|   11|
|      Yen|    2|
|Us Dollar|   74|
|  k dinar|  242|
+---------+-----+



In [13]:
# To save the dataframe in csv file

df.write.format('csv').save('C:/Users/HP/Desktop/SPARK/test')

# created a folder named test.... and solit the dataframe in to two parts...

In [14]:
# If we don't want partition

df.repartition(1).write.format('csv').save('C:/Users/HP/Desktop/SPARK/test1', header = True)

In [15]:
#To save the dataframe in txt file 

df.rdd.map(lambda x: x[0] + "," + str(x[1])).repartition(1).saveAsTextFile('C:/Users/HP/Desktop/SPARK/text1')


# rdd does not have write.format...so saveastextfile
# Str()....convert integer to string

In [16]:
# To read a txt file

dftxt = spark.read.text("C:/Users/HP/Desktop/SPARK/text1")
dftxt.show()

+------------+
|       value|
+------------+
|     Euro,90|
|   Pound,100|
|     Yuan,11|
|       Yen,2|
|Us Dollar,74|
| k dinar,242|
+------------+



In [17]:
# to read a csv file

#dfcsv = spark.read.csv("C:/Users/HP/Desktop/SPARK/test1")

dfcsv = spark.read.csv("C:/Users/HP/Desktop/SPARK/test1", header = True)
dfcsv.show()

+---------+-----+
| currency|value|
+---------+-----+
|     Euro|   90|
|    Pound|  100|
|     Yuan|   11|
|      Yen|    2|
|Us Dollar|   74|
|  k dinar|  242|
+---------+-----+



In [18]:
#To read table from mysql
#dfmysql = spark.read.format('jdbc')\
#    .option("url", "jdbc:mysql://ipaddress or url/retail_db")\
#    .option("driver", "com.mysql.jdbc.Driver")\
#    .option("dbtable","orders")\
#    .option("user", "sois")\
#    .option("password", "manipal")\
#    .load()


In [9]:
#Creating dataframe from datasource

df = spark.read.format('csv')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load('2015-summary.csv')
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [10]:
# performing operations on data

from pyspark.sql.functions import col, expr, column, udf, date_sub, date_add, col, datediff, regexp_extract
from pyspark.sql.types import StringType, IntegerType 


# To find data of 1 column

# various ways of selecting columns

#df.select(col('DEST_COUNTRY_NAME')).show(5, False)

#df.select(column('DEST_COUNTRY_NAME')).show(5, False)

#df.select('DEST_COUNTRY_NAME').show(5, False)

#df.select('DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME').show(5, False)

df.select(expr('DEST_COUNTRY_NAME AS Destination')) .show(5, False)
df.select('DEST_COUNTRY_NAME').show(5, False)


+-------------+
|Destination  |
+-------------+
|United States|
|United States|
|United States|
|Egypt        |
|United States|
+-------------+
only showing top 5 rows

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|United States    |
|United States    |
|United States    |
|Egypt            |
|United States    |
+-----------------+
only showing top 5 rows



In [11]:
# To add column in data frame
df.withColumn('withinCountry', expr('ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME')).show(5, False)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|United States    |Romania            |15   |false        |
|United States    |Croatia            |1    |false        |
|United States    |Ireland            |344  |false        |
|Egypt            |United States      |15   |false        |
|United States    |India              |62   |false        |
+-----------------+-------------------+-----+-------------+
only showing top 5 rows



In [12]:
# Adding column by user defined function

def computeGroup(count):
    if count < 2:
        return 'Min'
    elif count < 20:
        return 'Normal'
    elif count < 100:
        return 'More'
    else:
        return 'Busy'

group_udf = udf(computeGroup, StringType())
df.withColumn('Frequency', group_udf(col('count'))).show(5, False)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|Frequency|
+-----------------+-------------------+-----+---------+
|United States    |Romania            |15   |Normal   |
|United States    |Croatia            |1    |Min      |
|United States    |Ireland            |344  |Busy     |
|Egypt            |United States      |15   |Normal   |
|United States    |India              |62   |More     |
+-----------------+-------------------+-----+---------+
only showing top 5 rows



In [13]:
moviedf = spark.read.format('csv')\
    .option('header', 'true')\
    .option('inferschema', 'true')\
    .load('movie.csv')
moviedf.show()

+-------+--------------------+--------------------+
|movieId|               title|                type|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [14]:
moviedf.select('title').show(5, False)

+----------------------------------+
|title                             |
+----------------------------------+
|Toy Story (1995)                  |
|Jumanji (1995)                    |
|Grumpier Old Men (1995)           |
|Waiting to Exhale (1995)          |
|Father of the Bride Part II (1995)|
+----------------------------------+
only showing top 5 rows



In [15]:
movieYear = moviedf.withColumn('Year',regexp_extract(col('title'), r"(\d\d\d\d)", 1).cast(IntegerType()))
movieYear.show()

cleanedMovie = movieYear.na.drop()

+-------+--------------------+--------------------+----+
|movieId|               title|                type|Year|
+-------+--------------------+--------------------+----+
|      1|    Toy Story (1995)|Adventure|Animati...|1995|
|      2|      Jumanji (1995)|Adventure|Childre...|1995|
|      3|Grumpier Old Men ...|      Comedy|Romance|1995|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|1995|
|      5|Father of the Bri...|              Comedy|1995|
|      6|         Heat (1995)|Action|Crime|Thri...|1995|
|      7|      Sabrina (1995)|      Comedy|Romance|1995|
|      8| Tom and Huck (1995)|  Adventure|Children|1995|
|      9| Sudden Death (1995)|              Action|1995|
|     10|    GoldenEye (1995)|Action|Adventure|...|1995|
|     11|American Presiden...|Comedy|Drama|Romance|1995|
|     12|Dracula: Dead and...|       Comedy|Horror|1995|
|     13|        Balto (1995)|Adventure|Animati...|1995|
|     14|        Nixon (1995)|               Drama|1995|
|     15|Cutthroat Island ...|A

In [16]:
def calDecade(years):
    return (years - years%10)

decadeudf = udf(calDecade, IntegerType())
movieDecade = cleanedMovie.withColumn('Decade', decadeudf(col('Year')).cast(IntegerType()))
movieDecade.show()



+-------+--------------------+--------------------+----+------+
|movieId|               title|                type|Year|Decade|
+-------+--------------------+--------------------+----+------+
|      1|    Toy Story (1995)|Adventure|Animati...|1995|  1990|
|      2|      Jumanji (1995)|Adventure|Childre...|1995|  1990|
|      3|Grumpier Old Men ...|      Comedy|Romance|1995|  1990|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|1995|  1990|
|      5|Father of the Bri...|              Comedy|1995|  1990|
|      6|         Heat (1995)|Action|Crime|Thri...|1995|  1990|
|      7|      Sabrina (1995)|      Comedy|Romance|1995|  1990|
|      8| Tom and Huck (1995)|  Adventure|Children|1995|  1990|
|      9| Sudden Death (1995)|              Action|1995|  1990|
|     10|    GoldenEye (1995)|Action|Adventure|...|1995|  1990|
|     11|American Presiden...|Comedy|Drama|Romance|1995|  1990|
|     12|Dracula: Dead and...|       Comedy|Horror|1995|  1990|
|     13|        Balto (1995)|Adventure|

In [17]:
movieDecade.filter('Decade == 1990').show()

+-------+--------------------+--------------------+----+------+
|movieId|               title|                type|Year|Decade|
+-------+--------------------+--------------------+----+------+
|      1|    Toy Story (1995)|Adventure|Animati...|1995|  1990|
|      2|      Jumanji (1995)|Adventure|Childre...|1995|  1990|
|      3|Grumpier Old Men ...|      Comedy|Romance|1995|  1990|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|1995|  1990|
|      5|Father of the Bri...|              Comedy|1995|  1990|
|      6|         Heat (1995)|Action|Crime|Thri...|1995|  1990|
|      7|      Sabrina (1995)|      Comedy|Romance|1995|  1990|
|      8| Tom and Huck (1995)|  Adventure|Children|1995|  1990|
|      9| Sudden Death (1995)|              Action|1995|  1990|
|     10|    GoldenEye (1995)|Action|Adventure|...|1995|  1990|
|     11|American Presiden...|Comedy|Drama|Romance|1995|  1990|
|     12|Dracula: Dead and...|       Comedy|Horror|1995|  1990|
|     13|        Balto (1995)|Adventure|

## Date & Time

In [20]:
from pyspark.sql.functions import current_date, current_timestamp, date_sub, date_add, col, datediff, to_date,to_timestamp, lit, date_format

In [21]:
# Example 1
dateDF = spark.range(10)\
    .withColumn('today', current_date())\
    .withColumn('now', current_timestamp())
dateDF.show(5, False)

+---+----------+-----------------------+
|id |today     |now                    |
+---+----------+-----------------------+
|0  |2021-01-28|2021-01-28 06:06:21.031|
|1  |2021-01-28|2021-01-28 06:06:21.031|
|2  |2021-01-28|2021-01-28 06:06:21.031|
|3  |2021-01-28|2021-01-28 06:06:21.031|
|4  |2021-01-28|2021-01-28 06:06:21.031|
+---+----------+-----------------------+
only showing top 5 rows

