In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q !wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [4]:
!pip install -q findspark

In [5]:
import findspark
findspark.init()

Creating Spark Session

In [6]:
import pyspark
from pyspark.sql import *
spark=SparkSession.builder.appName('Test').master('local[3]').enableHiveSupport().getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f05bf41c390>


Creating RDD. Count word in files using map, flatmap, reduceByKey

In [8]:
rdd=spark.sparkContext.textFile('/content/sample_file.txt')\
        .flatMap(lambda x:x.split(' '))\
        .map(lambda x:(x,1))\
        .reduceByKey(lambda x,y:x+y)

for (word, count) in rdd.collect():
          print("%s: %i" % (word, count))

refers: 1
large,: 1
diverse: 1
sets: 1
of: 4
: 6
at: 2
ever-increasing: 1
It: 1
velocity: 1
speed: 1
is: 1
created: 1
collected,: 1
scope: 1
points: 1
covered: 1
as: 1
"three: 1
v's": 1
mining: 1
in: 1
multiple: 1
formats.: 1
Big: 2
data: 4
to: 1
the: 6
information: 1
that: 1
grow: 1
rates.: 1
encompasses: 1
volume: 1
information,: 1
or: 2
which: 1
it: 1
and: 3
variety: 1
being: 1
(known: 1
big: 1
data).: 1
often: 1
comes: 1
from: 1
arrives: 1


Creating dataframe

In [20]:
stock_df = spark.read \
        .option('inferSchema', 'true') \
        .option('header', 'true') \
        .csv('/content/walmart_stock.csv')

print(type(stock_df))
stock_df.printSchema()
# stock_df.select(round('Adj Close',2)).show?()
stock_df.show()

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         5

Handling bad records

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("_corrupt_record", StringType(), True)
])
#.option("columnNameOfCorruptRecord", "_corrupt_record")\ PERMISSIVE, DROPMALFORMED
df1 = spark.read\
         .option("mode", "PERMISSIVE")\
         .schema(schema)\
         .option('header', 'true') \
         .csv('/content/bad_rec_file.csv')
df1.printSchema()
df1.show()

df2 = spark.read\
         .option("mode", "DROPMALFORMED")\
         .schema(schema)\
         .option('header', 'true') \
         .csv('/content/bad_rec_file.csv')
# df2.printSchema()
# df2.show()

df3 = spark.read\
         .option("mode", "FAILFAST")\
         .schema(schema)\
         .option('header', 'true') \
         .csv('/content/bad_rec_file.csv')
# df3.printSchema()
# df3.show()

Handling NULL values

In [None]:
df4 = spark.read \
        .option('inferSchema', 'true') \
        .option('header', 'true') \
        .csv('/content/null_val.csv')

df4.show()
print('Using isnotNull')
df4.filter((df4.Name.isNotNull())&(df4.Age.isNotNull())).show()
print('Using fill()')
df4.na.fill({'Name':'unknown','Age':""}).show()
print('Using drop()')
df4.na.drop().show()



**Join**

In [None]:
emp_df = spark.read \
        .option('inferSchema', 'true') \
        .option('header', 'true') \
        .csv('/content/emptbl.csv')

emp_df.show()

dept_df = spark.read \
        .option('inferSchema', 'true') \
        .option('header', 'true') \
        .csv('/content/depttbl.csv')

dept_df.show()
print("inner join")
emp_df.join(dept_df,emp_df.deptno == dept_df.deptno,"inner").show()
print("left join")
emp_df.join(dept_df,emp_df.deptno == dept_df.deptno,"left").show()
print("right join")
emp_df.join(dept_df,emp_df.deptno == dept_df.deptno,"right").show()
print("full join")
emp_df.join(dept_df,emp_df.deptno == dept_df.deptno,"full").show()

Shared Variables - Accumulator, broadcast

In [None]:
bdcast=spark.sparkContext.broadcast(['abc','pqr','iuo'])
data=bdcast.value[1]
print('broadcast value- ',data)

accum=spark.sparkContext.accumulator(10)
rdd=spark.sparkContext.parallelize([1,2,3,4])
rdd.foreach(lambda x:accum.add(x))
print('accumulator value- ',accum.value)

broadcast value-  pqr
accumulator value-  20


Repartiton and Coalesce

In [None]:
stock_df = spark.read \
        .option('inferSchema', 'true') \
        .option('header', 'true') \
        .csv('/content/walmart_stock.csv')

print('number of partition 1- ',stock_df.rdd.getNumPartitions())
stock_df = stock_df.repartition(6)
print('number of partition 2 after repartition-',stock_df.rdd.getNumPartitions())
stock_df = stock_df.coalesce(3)
print('number of partition 3 after coalesce-',stock_df.rdd.getNumPartitions())
stock_df.show()

Windows Function on Dataframe like Rank, Dense_rank, row_number

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number,dense_rank,rank, to_timestamp, to_date

emp_df = spark.read \
        .option('inferSchema', 'true') \
        .option('header', 'true') \
        .csv('/content/emptbl.csv')

# emp_df.show()
emp_df.printSchema()
windowSpec  = Window.partitionBy("deptno").orderBy("sal")

emp_df.withColumn("row_number",row_number().over(windowSpec)) \
      .withColumn("dense_rank",dense_rank().over(windowSpec))\
      .withColumn("rank",rank().over(windowSpec))\
      .show()

root
 |-- empno: integer (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- mgr: integer (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: integer (nullable = true)
 |-- comm: integer (nullable = true)
 |-- deptno: integer (nullable = true)

+-----+------+---------+----+----------------+----+----+------+----------+----------+----+
|empno| ename|      job| mgr|        hiredate| sal|comm|deptno|row_number|dense_rank|rank|
+-----+------+---------+----+----------------+----+----+------+----------+----------+----+
| 7369| SMITH|    CLERK|7902|13-06-1993 00:00| 800|   0|    20|         1|         1|   1|
| 7876| ADAMS|    CLERK|7788|04-06-1999 00:00|1100|null|    20|         2|         2|   2|
| 7566| JONES|  MANAGER|7839|31-10-1995 00:00|2975|null|    20|         3|         3|   3|
| 7788| SCOTT|  ANALYST|7566|05-03-1996 00:00|3000|null|    20|         4|         4|   4|
| 7902|  FORD|  ANALYST|7566|05-12-1997 00:00|3000|null|   

Partition and Bucketing on Dataframe

In [None]:
emp_df = spark.read \
        .option('inferSchema', 'true') \
        .option('header', 'true') \
        .csv('/content/emptbl.csv')

# emp_df.show()
emp_df.printSchema()
emp_df.write\
      .partitionBy("deptno") \
      .mode("overwrite") \
      .csv("partitions/partitionBy.csv", header=True)

emp_df.write\
      .bucketBy(3, 'deptno') \
      .mode("overwrite") \
      .saveAsTable('bucketed', format='csv')





spark sql

In [None]:
emp_df = spark.read \
        .option('inferSchema', 'true') \
        .option('header', 'true') \
        .csv('/content/emptbl.csv')


spark.catalog.dropTempView("emp")
emp_df.createTempView("emp")
spark.sql('select * from emp').show()


Write parquet and ORC file

In [None]:
emp_df = spark.read \
        .option('inferSchema', 'true') \
        .option('header', 'true') \
        .csv('/content/emptbl.csv')


emp_df.write.mode('overwrite').parquet("parquet_output/emp.parquet")


emp_df.write.mode('overwrite').format("orc").save("orc/emp_orc.orc")


Remove duplicate records

In [None]:
emp_df = spark.read \
        .option('inferSchema', 'true') \
        .option('header', 'true') \
        .csv('/content/bad_rec_file.csv')

emp_df.show()
print('removing duplicates using drop_duplicates')
emp_df.drop_duplicates().show()
print('removing duplicates using distinct')
emp_df.distinct().show()

Reading JSON file

In [None]:
from pyspark.sql.functions import explode,col,from_json
from pyspark.sql.types import MapType,StringType
json_df=spark.read.option('multiline','true')\
.json('/content/sample_json.json')

json_df.show()
json_df.printSchema()
final_df=json_df.withColumn('contact no',explode('phoneNumbers'))\
                .withColumn('contact_no',col('contact no.number'))\
                .withColumn('contact_type',col('contact no.type'))\
                .drop('contact no','phoneNumbers')\
                .withColumn('city',col("address.city"))\
                .withColumn('state',col("address.state"))\
                .withColumn('streetAddress',col("address.streetAddress"))\
                .drop('address')

final_df.printSchema()
final_df.show()

+--------------------+---+---------+------+--------+--------------------+
|             address|age|firstName|gender|lastName|        phoneNumbers|
+--------------------+---+---------+------+--------+--------------------+
|[San Diego, CA, 101]| 28|      Joe|  male| Jackson|[[7349282382, home]]|
|[dfsdf dsfsdf, LA...| 35|      Dan|female|     Leo|[[854282352, offi...|
+--------------------+---+---------+------+--------+--------------------+

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- streetAddress: string (nullable = true)
 |-- age: long (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- phoneNumbers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- number: string (nullable = true)
 |    |    |-- type: string (nullable = true)

root
 |-- age: long (nullable = true)
 |