In [3]:
import findspark
findspark.init('/usr/spark2.4.3')

import pyspark # only run this after findspark.init()
from pyspark.sql import SparkSession, SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

spark = SparkSession.builder.appName("PysparkExample").getOrCreate()
sc = spark.sparkContext

print(spark.sparkContext)
print("Spark App Name : "+ spark.sparkContext.appName)

<SparkContext master=local[*] appName=PysparkExample>
Spark App Name : PysparkExample


In [4]:
df = spark.read.csv("pyspark/inputfile/zipcodes.csv")
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)



In [3]:
df2 = spark.read.option("header",True) \
     .csv("pyspark/inputfile/zipcodes.csv")
df2.printSchema()

root
 |-- RecordNumber: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- Xaxis: string (nullable = true)
 |-- Yaxis: string (nullable = true)
 |-- Zaxis: string (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: string (nullable = true)
 |-- TaxReturnsFiled: string (nullable = true)
 |-- EstimatedPopulation: string (nullable = true)
 |-- TotalWages: string (nullable = true)
 |-- Notes: string (nullable = true)



In [5]:

df3 = spark.read.options(header='True', delimiter=',') \
  .csv("pyspark/inputfile/zipcodes.csv")
df3.printSchema()

schema = StructType() \
      .add("RecordNumber",IntegerType(),True) \
      .add("Zipcode",IntegerType(),True) \
      .add("ZipCodeType",StringType(),True) \
      .add("City",StringType(),True) \
      .add("State",StringType(),True) \
      .add("LocationType",StringType(),True) \
      .add("Lat",DoubleType(),True) \
      .add("Long",DoubleType(),True) \
      .add("Xaxis",IntegerType(),True) \
      .add("Yaxis",DoubleType(),True) \
      .add("Zaxis",DoubleType(),True) \
      .add("WorldRegion",StringType(),True) \
      .add("Country",StringType(),True) \
      .add("LocationText",StringType(),True) \
      .add("Location",StringType(),True) \
      .add("Decommisioned",BooleanType(),True) \
      .add("TaxReturnsFiled",StringType(),True) \
      .add("EstimatedPopulation",IntegerType(),True) \
      .add("TotalWages",IntegerType(),True) \
      .add("Notes",StringType(),True)
      
df_with_schema = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load("pyspark/inputfile/zipcodes.csv")
df_with_schema.printSchema()

df2.write.option("header",True) \
 .csv("pyspark/outputfile/zipcodes123")

root
 |-- RecordNumber: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- Xaxis: string (nullable = true)
 |-- Yaxis: string (nullable = true)
 |-- Zaxis: string (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: string (nullable = true)
 |-- TaxReturnsFiled: string (nullable = true)
 |-- EstimatedPopulation: string (nullable = true)
 |-- TotalWages: string (nullable = true)
 |-- Notes: string (nullable = true)

root
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: 

AnalysisException: 'path hdfs://cxln1.c.thelab-240901.internal:8020/user/saravananmsk1536/pyspark/outputfile/zipcodes123 already exists.;'

In [17]:
df2 = spark.read.option("header",True) \
     .csv("pyspark/inputfile/zipcodes.csv")
df2.where(df2['City'] =='MESA').select('RecordNumber','City','State').limit(10).show()
df2.where(df2['City'] == 'MESA').select('RecordNumber', 'City', 'State').limit(10).show()


+------------+----+-----+
|RecordNumber|City|State|
+------------+----+-----+
|       39827|MESA|   AZ|
|       39828|MESA|   AZ|
+------------+----+-----+

+------------+----+-----+
|RecordNumber|City|State|
+------------+----+-----+
|       39827|MESA|   AZ|
|       39828|MESA|   AZ|
+------------+----+-----+



In [21]:
df2.withColumn("State", lit("ABC"))
df2.withColumn("City", lit("DEF"))
df2.where(df2['City'] =='DEF').show()

+------------+-------+-----------+----+-----+------------+---+----+-----+-----+-----+-----------+-------+------------+--------+-------------+---------------+-------------------+----------+-----+
|RecordNumber|Zipcode|ZipCodeType|City|State|LocationType|Lat|Long|Xaxis|Yaxis|Zaxis|WorldRegion|Country|LocationText|Location|Decommisioned|TaxReturnsFiled|EstimatedPopulation|TotalWages|Notes|
+------------+-------+-----------+----+-----+------------+---+----+-----+-----+-----+-----------+-------+------------+--------+-------------+---------------+-------------------+----------+-----+
+------------+-------+-----------+----+-----+------------+---+----+-----+-----+-----+-----------+-------+------------+--------+-------------+---------------+-------------------+----------+-----+

