# Learn PySpark

![](https://miro.medium.com/v2/1*E27zNNFMwTvXj_XbO2RY-Q.png)

In [0]:
import requests
data = requests.get("https://public.tableau.com/app/sample-data/mobile_os_usage.csv")
dbutils.fs.put("/Volumes/workspace/default/volume/mobile_os_usage.csv", data.text)

In [0]:
%fs ls "/Volumes/workspace/default/volume"

In [0]:
%fs cp "/Volumes/workspace/default/volume/mobile_os_usage.csv" "/Volumes/workspace/default/volume/mobile_os_usage4.csv"

In [0]:
dbutils.widgets.combobox("db", "default", ["Database1","Database2","Database3","tbl1","tbl2","tbl3"])
dbutils.widgets.dropdown("tbl1", "mobile_os_usage", ["mobile_os_usage", "mobile_os_usage1", "mobile_os_usage2"])

In [0]:
dbutils.fs.cp("/Volumes/workspace/default/volume/mobile_os_usage1.csv", "/Volumes/workspace/default/volume/mobile_os_usage2.csv")

In [0]:
%sql
create table if not exists cities(id int,city string);
insert into cities values(3,'Mumbai'),(4,'Lucknow');
select * from cities;

In [0]:
%sql
from cities select *

In [0]:
%sql
show create table cities

In [0]:
df1=spark.read.csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust")
df2=spark.read.csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_header", header=True)
df3=spark.read.csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_other_delim", sep="~", inferSchema=True).toDF("custid", "fname", "lname", "age", "prof")
df3=spark.read.csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_other_delim", sep="~", samplingRatio=0.0001)
#df1.createOrReplaceTempView("df1")
#spark.sql("select * from df1").show(2)
#display(df1)
#print(df3.printSchema())
#df3.show(6)
df3.where(df3.age > 40).show()

In [0]:
df3=spark.read.format("csv").option("sep", "~").option("inferSchema", "true").load("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_other_delim").toDF("custid", "fname", "lname", "age", "prof")
df3.show(6)

In [0]:
df3=spark.read.options(sep="~", inferSchema="true").csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_other_delim").toDF("custid", "fname", "lname", "age", "prof")
df3.show(6)

In [0]:
tbl_struct="id integer, fname string, lname string, age integer, prof string"
df3=spark.read.schema(tbl_struct).csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_other_delim", sep="~")
print(df3.printSchema())
df3.show(6)

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
custom_schema=StructType([
  StructField("custid", IntegerType(), False),
  StructField("fname", StringType(), True),
  StructField("lname", StringType(), True),
  StructField("age", IntegerType(), True),
  StructField("prof", StringType(), True)
])
df3=spark.read.schema(custom_schema).csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_other_delim", sep="~")
print(df3.printSchema())
df3.show(1)
display(df3.take(2))

In [0]:
#df3=spark.read.mode("permissive").format("csv").option("sep", "~").load("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_malformed")
df3=spark.read.options(header=True, sep="~", mode="PERMISSIVE", quote="'").csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_malformed")
display(df3)
df3=spark.read.options(header=True, sep="~", mode="dropMalformed", quote="'").csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_malformed")
display(df3)
df3=spark.read.options(header=True, sep="~", mode="failFast", quote="'").csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_malformed")
display(df3)

In [0]:
struct=("id integer, fname string, lname string, age integer, prof string")
df3=spark.read.schema(struct).options(header=True, sep="~", mode="permissive", quote="'", escape="\\", multiline=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, nullValue="Karen", nanValue=74, lineSep="\n").csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_malformed")
display(df3)

In [0]:
df3=spark.read.options(allowunquotedFieldNames=True, allowSingleQuotes=True, allowcomment=True, multiLine=True, lineSep="\n").json("/Volumes/workspace/wd36_schema/ingestion_volume/json/cust_malformed.json")
display(df3)

In [0]:
dftn=spark.read.options(header=True, sep="~", mode="overwrite").csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_malformed_TN")
#display(dftn)
dfka=spark.read.options(header=True, sep="~", mode="overwrite").csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_malformed_KA")
#display(dfka)
df_merge=spark.read.options(header=True, sep="~", mode="overwrite", mergeSchema=True).csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_malformed_*")
display(df_merge)
df_unionbyname=dftn.unionByName(dfka, allowMissingColumns=True)
display(df_unionbyname)

In [0]:
df3.write.options().options(quoteall=True).mode("overwrite").csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_quoteall.csv")
df3.show()

In [0]:
#df3.write.json("/Volumes/workspace/default/volume/json")
df3.repartition(2).write.mode("overwrite").json("/Volumes/workspace/default/volume/json")

In [0]:
df3.repartition(2).write.mode("overwrite").orc("/Volumes/workspace/default/volume/orc")

In [0]:
#WORM - Write Once Read Many
df3.write.mode("overwrite").parquet("/Volumes/workspace/default/volume/parquet")

In [0]:
#WMRM - Write Many Read Many
df3.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/volume/delta")

In [0]:
df3.write.mode("overwrite").saveAsTable("workspace.default.mobile_os_usage")
df3.write.mode("append").insertInto("workspace.default.mobile_os_usage")
display(spark.sql("select * from workspace.default.mobile_os_usage"))


In [0]:
%sh df -h /

### Rejection & Retain

In [0]:
struct=("id integer, fname string, lname string, age integer, prof string, corrupted_record string")
df1=spark.read.schema(struct).options(header=True, sep="~", mode="overwrite", columnNameOfCorruptRecord="corrupted_record").csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_malformed_clensing")
display(df1)
#Rejection
regdf1=df1.filter(df1.corrupted_record.isNotNull())
display(regdf1)
#Retain
retdf1=df1.filter(df1.corrupted_record.isNull())
display(retdf1)

### Clensing

In [0]:
struct=("id integer, fname string, lname string, age integer, prof string")
df2=spark.read.schema(struct).options(header=True, sep="~", mode="overwrite").csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_malformed_clensing")
display(df2)
#clensing
clndf1=df2.na.drop(how="any", subset=["id", "age"])
display(clndf1)
clndf2=df2.na.drop(how="all", subset=["id", "age"])
display(clndf2)

### Scrubbing

In [0]:
scrbdf1=clndf2.na.fill({"age": 0})
display(scrbdf1)
replcevalue={"Pilot": "Captain"}
scrbdf2=clndf2.na.replace(replcevalue, subset=["prof"])
display(scrbdf2)

### DeDublication

In [0]:
deldf1=scrbdf2.distinct()
display(deldf1)
deldf2=scrbdf2.dropDuplicates(subset=["id"])
display(deldf2)

### Standardization - add column

In [0]:
from pyspark.sql.functions import lit, initcap, col
stddf1=spark.read.options(header=True).csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_header")
stddf2=stddf1.withColumn("source", lit("Retail"))
display(stddf2.take(10))

### Standardization - column uniformity

In [0]:
stdUnifdf1=stddf2.groupBy("profession").count().orderBy("profession", ascending=True).withColumn("profession", initcap(col("profession")))
display(stdUnifdf1)

### Standardization - data Standardization

In [0]:
datastddf1=spark.read.options(header=True).csv("/Volumes/workspace/wd36_schema/ingestion_volume/source/cust_header")
display(datastddf1.take(10))
datastddf1.where("profession rlike '[0-9]'").show()