In [0]:
from pyspark.sql import SparkSession

In [0]:
df = spark.read.json("/FileStore/tables/people.json")

In [0]:
df.show()

In [0]:
df.printSchema()

In [0]:
df.columns

In [0]:
df.describe()

In [0]:
df.describe('age').show()

In [0]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType

In [0]:
data_schema = [StructField('age', IntegerType(), True),
               StructField('name', StringType(), True)]  # can be null
final_struct = StructType(fields = data_schema)

In [0]:
df = spark.read.json("/FileStore/tables/people.json", schema = final_struct)

In [0]:
df.printSchema()

In [0]:
df.show()

In [0]:
df['age']

In [0]:
type(df['age'])

In [0]:
df.select('age') # dataframe

In [0]:
type(df.select('age'))

In [0]:
df.select('age').show()

In [0]:
df.head(2)

In [0]:
df.head(2)[0]

In [0]:
type(df.head(2)[0])

In [0]:
df.select(['age','name']).show()

In [0]:
# add or replace columns, not inp;ace = True
df_new = df.withColumn('newage',df['age']*2)

In [0]:
df.show()

In [0]:
df_new.show()

In [0]:
df.withColumnRenamed('age', 'newage').show()

In [0]:
# useSQL
df.createOrReplaceTempView('people')

In [0]:
results = spark.sql("SELECT * from people")

In [0]:
results.show()

In [0]:
new_results= spark.sql("SELECT * FROM people WHERE age = 30")

In [0]:
new_results.show()

In [0]:
df = spark.read.csv('/FileStore/tables/appl_stock.csv', inferSchema = True, header = True)  # very first row in the csv file is header

In [0]:
df.show()

In [0]:
df.printSchema()

In [0]:
df.head(3)[2]

In [0]:
df.filter("Close < 500").show()  # usesql syntax to filter or just sparkDF to filter

In [0]:
df.filter("Close < 500").select(['Open','Close']).show()

In [0]:
df.filter(df['Close'] < 500).select('Volume').show()

In [0]:
df.filter((df['Close'] < 200) & (df['Open'] > 200)).show()

In [0]:
df.filter((df['Close'] < 200) & ~(df['Open'] > 200)).show()  # ~ not

In [0]:
df.filter(df['Low'] == 197.16).show()

In [0]:
display(df.filter(df['Low'] == 197.16))

Date,Open,High,Low,Close,Volume,Adj Close
2010-01-22,206.780006,207.499996,197.16,197.75,220441900,25.620401


In [0]:
df.filter(df['Low'] == 197.16).collect()

In [0]:
result = df.filter(df['Low'] == 197.16).collect() # use collect into new variable

In [0]:
display(result)

Date,Open,High,Low,Close,Volume,Adj Close
2010-01-22,206.780006,207.499996,197.16,197.75,220441900,25.620401


In [0]:
type(result)

In [0]:
type(result[0])

In [0]:
row = result[0]
row.asDict()

In [0]:
row.asDict()['Volume']

In [0]:
df = spark.read.csv("/FileStore/tables/sales_info.csv", inferSchema = True, header = True)

In [0]:
df.show()

In [0]:
df.printSchema()

In [0]:
df.groupby('Company')

In [0]:
df.groupby('Company').mean('Sales').show()

In [0]:
df.groupby('Company').sum('Sales').show()

In [0]:
df.groupby('Company').count().show()

In [0]:
df.agg({'Sales' : 'sum'}).show()  # all sales

In [0]:
df.agg({'Sales':'max'}).show()

In [0]:
group_data = df.groupby("Company")

In [0]:
group_data.agg({"Sales":"Max"}).show()  # group by then agg same as group by max sales

In [0]:
df.groupby('Company').max('Sales').show()

In [0]:
from pyspark.sql.functions import countDistinct, avg, stddev

df.select(countDistinct('Sales')).show()


In [0]:
df.select(countDistinct('Sales').alias('Average Sales')).show()

In [0]:
sales_std = df.select(stddev('Sales').alias('standard'))

In [0]:
from pyspark.sql.functions import format_number
sales_std.select(format_number('standard', 2)).show()

In [0]:
# we can alias in the end
sales_std.select(format_number('standard', 2).alias('std')).show()

In [0]:
df.orderBy("Sales").show() # for asc, just input the string name

In [0]:
df.orderBy(df["Sales"].desc()).show() # for desc, put df.column.desc()

In [0]:
# Spark Dataframes Missing Data
df = spark.read.csv("/FileStore/tables/ContainsNull.csv", header = True, inferSchema = True)

In [0]:
df.show()

In [0]:
#drop

df.na.drop().show()

In [0]:
df.na.drop(thresh = 2).show() # at least two null values to be dropped

In [0]:
df.na.drop(how = 'any').show() # any row has null

In [0]:
df.na.drop(how = 'all').show() # all are null

In [0]:
df.na.drop(subset = ["Sales"]).show() # all are null

In [0]:
# Fill
df.printSchema()

In [0]:
df.na.fill('Fill Value').show()  # columns with string types will fill

In [0]:
df.na.fill(0).show() # column with numerical type will fill

In [0]:
df.na.fill('No Name', subset = ["Name"]).show()

In [0]:
df.na.fill(0, subset = ["Sales"]).show()

In [0]:
from pyspark.sql.functions import mean
mean_val = df.select(mean(df["Sales"])).collect()

In [0]:
mean_val

In [0]:
mean_val[0]

In [0]:
mean_val[0][0]

In [0]:
mean_Sales = mean_val[0][0]

In [0]:
df.na.fill(mean_Sales,["Sales"]).show()

In [0]:
df.na.fill(df.select(mean(df["Sales"])).collect()[0][0],subset = ["Sales"]).show()

In [0]:
# Date and TimeStamp
df = spark.read.csv('/FileStore/tables/appl_stock.csv', inferSchema = True, header = True)
df.head(1)

In [0]:
df.show()

In [0]:
df.select(['Date','Open']).show()

In [0]:
from pyspark.sql.functions import dayofmonth, hour, dayofyear, month, year,weekofyear, format_number, date_format


In [0]:
df.select(dayofmonth(df["date"])).show()

In [0]:
df.select(hour(df["date"])).show()

In [0]:
df.select(month(df["date"])).show()

In [0]:
newdf = df.withColumn("Year", year(df["date"]))

In [0]:
newdf.groupby('Year').mean('Close').show()

In [0]:
result = newdf.groupby('Year').mean().select(['Year', 'avg(Close)'])

In [0]:
result.select('Year', format_number('avg(Close)', 2).alias('average close price')).show()