In [0]:
from pyspark.sql.types import *

people_schema = StructType([
  StructField('name',StringType(),False),
  StructField('age',IntegerType(),False),
  StructField('city',StringType(),False)
])

In [0]:
from pyspark.sql import functions as F
aa_dfw_df = spark.read.format("csv").options(Header=True).load("/FileStore/tables/AA_DFW_2017_Departures_Short-5.csv")
aa_dfw_df = aa_dfw_df.withColumn("airport",F.lower(aa_dfw_df["Destination Airport"]))
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df["Destination Airport"])
aa_dfw_df.show()

In [0]:
df1 = spark.read.format("csv").options(Header=True).load("/FileStore/tables/AA_DFW_2016_Departures_Short.csv")
df2 = spark.read.format("csv").options(Header=True).load("/FileStore/tables/AA_DFW_2017_Departures_Short-5.csv")
df3 = df1.union(df2)
df3 = df3.withColumnRenamed("Date (MM/DD/YYYY)","Date")
df3 = df3.withColumnRenamed("Flight Number","Flight_Number")
df3 = df3.withColumnRenamed("Actual elapsed time (Minutes)","Actual_elapsed_time")
df3 = df3.withColumnRenamed("Destination Airport","Destination_Airport")
# df3.write.parquet("/tmp/output/AA_DFW_ALL.parquet")
df = spark.read.format("parquet").load("/tmp/output/AA_DFW_ALL.parquet")
print(df1.count())
print(df2.count())
print(df.count())

In [0]:
flights_df = spark.read.format("parquet").load("/tmp/output/AA_DFW_ALL.parquet")
flights_df.createOrReplaceTempView("flights")
short_flights_df = spark.sql("SELECT * FROM flights WHERE Actual_elapsed_time < 100")

In [0]:
from pyspark.sql.functions import length
voter_df = spark.read.format("csv").options(Header=True).load("/FileStore/tables/DallasCouncilVoters.csv")
voter_df.select("VOTER_NAME").distinct().show()
voter_df = voter_df.filter("length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20")
voter_df = voter_df.where(~voter_df["VOTER_NAME"].contains("_"))
voter_df.select("VOTER_NAME").distinct().show()

In [0]:
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))
voter_df = voter_df.drop('splits') 
voter_df.show()

In [0]:
voter_df = voter_df.withColumn('random_val',F.when(voter_df.TITLE == 'Councilmember',F.rand()))
voter_df.show()

In [0]:
voter_df = voter_df.withColumn('random_val',F.when(voter_df.TITLE == 'Councilmember', F.rand()).when(voter_df.TITLE == 'Mayor', 2).otherwise(0))
voter_df.show()
voter_df.filter(voter_df.random_val==0).show()

In [0]:
def getFirstAndMiddle(names):
  return ' '.join(names[:-1])
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))
voter_df = voter_df.drop(voter_df.splits)
voter_df.show()

In [0]:
voter_df = voter_df.select(voter_df.VOTER_NAME).distinct()
print(voter_df.count())
voter_df = voter_df.withColumn("ROW_ID",F.monotonically_increasing_id())
voter_df.orderBy(voter_df.ROW_ID.desc()).show()

In [0]:
print(voter_df.rdd.getNumPartitions())
voter_df = voter_df.withColumn("ROW_ID",F.monotonically_increasing_id())
voter_df.orderBy(voter_df.ROW_ID.desc()).show()

In [0]:
## dataframe not available

In [0]:
## dataframe not available
import time
start_time = time.time()
df = voter_df.distinct().cache()
print(df.count(), time.time()-start_time)

start_time = time.time()
print(df.count(), time.time()-start_time)

In [0]:
print("Is df cached?: %s" % df.is_cached)
df.unpersist()
print("Is df cached?: %s" % df.is_cached)

In [0]:
## dataframe not available
df1 = spark.read.csv("/FileStore/tables/DallasCouncilVoters.csv")
df2 = spark.read.csv("/FileStore/tables/DallasCouncilVotes_csv.gz")

start_time = time.time()
print(df1.count())
print(" Time to run for df1 %f" %(time.time()-start_time))

start_time = time.time()
print(df2
      .count())
print(" Time to run for df2 %f" %(time.time()-start_time))


In [0]:
app_name = spark.conf.get("spark.app.name")
driver_tcp_port = spark.conf.get("spark.driver.port")
num_partitions = spark.conf.get("spark.sql.shuffle.partitions")

print(app_name,driver_tcp_port,num_partitions)

In [0]:
## dataframe not available
before = df.rdd.getNumPartitions()
spark.conf.set('spark.sql.shuffle.partitions', 500)
df1 = spark.read.csv("/FileStore/tables/DallasCouncilVotes_csv.gz").distinct()
after = df1.rdd.getNumPartitions()
print(before)
print(after)

In [0]:
airports_df = spark.read.csv("/FileStore/tables/airports.csv",header=True)
flights_df = spark.read.csv("/FileStore/tables/flights.csv",header=True)
normal_df = flights_df.join(airports_df,flights_df["dest"] == airports_df["faa"] )
normal_df.explain()

In [0]:
from pyspark.sql.functions import broadcast
broadcast_df = flights_df.join(broadcast(airports_df),flights_df["dest"]==airports_df["faa"])
broadcast_df.explain()

In [0]:
start_time = time.time()

normal_count = normal_df.count()
normal_duration = time.time() - start_time
start_time = time.time()

broadcast_count = broadcast_df.count()
broadcast_duration = time.time() - start_time

print("Normal count:\t\t%d\tduration: %f" % (normal_count, normal_duration))
print("Broadcast count:\t%d\tduration: %f" % (broadcast_count, broadcast_duration))

In [0]:
df = spark.read.csv("/FileStore/tables/airports.csv", header=True)
df = df.filter(df[3] > 0)

df = df.withColumn('id', F.monotonically_increasing_id())
df.write.json('output.json', mode='overwrite')

In [0]:
votes_df = spark.read.csv("/FileStore/tables/DallasCouncilVotes_csv.gz", sep ='|')
full_count= votes_df.count()
comment_count = votes_df.where(votes_df['_c0'].startswith('#')).count()

no_comments_df = spark.read.csv("/FileStore/tables/DallasCouncilVotes_csv.gz", sep ='|', comment ='#')
no_comments_count = no_comments_df.count()

print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))

In [0]:
## Dataset not available
initial_count = votes_df.count()
tmp_fields = F.split(votes_df['_c0'], ',')
votes_df = votes_df.withColumn('colcount', F.size(tmp_fields))
df_filtered = votes_df.filter(~ (votes_df["colcount"] < 5))
final_count = df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))

In [0]:
## Dataset not available
annotations_df = votes_df
split_cols = F.split(annotations_df["_c0"], ',')
split_df = annotations_df.withColumn('DATE', split_cols.getItem(0))
split_df = split_df.withColumn('AGENDA_ITEM_NUMBER', split_cols.getItem(1))
split_df = split_df.withColumn('ITEM_TYPE', split_cols.getItem(2))
split_df = split_df.withColumn('DISTRICT', split_cols.getItem(3))
split_df = split_df.withColumn('TITLE', split_cols.getItem(4))
split_df = split_df.withColumn('VOTER_NAME', split_cols.getItem(5))
split_df = split_df.withColumn('VOTE_CAST', split_cols.getItem(6))
split_df = split_df.withColumn('FINAL_ACTION_TAKEN', split_cols.getItem(7))
split_df = split_df.withColumn('AGENDA_ITEM_DESC', split_cols.getItem(8))
split_df = split_df.withColumn('AGENDA_ID', split_cols.getItem(9))
split_df = split_df.withColumn('VOTE_ID', split_cols.getItem(10))
split_df = split_df.withColumn('split_cols', split_cols)
split_df.show()

In [0]:
def retriever(cols, colcount):
  return cols[10:colcount]

udfRetriever = F.udf(retriever, ArrayType(StringType()))

split_df = split_df.withColumn('dog_list', udfRetriever(split_df.split_cols, split_df.colcount))
splited_df = split_df.drop('_c0').drop('split_cols').drop('colcount')
splited_df.show()

In [0]:
votes_df = spark.read.csv("/FileStore/tables/DallasCouncilVotes_csv.gz", sep ='|')
valid_folders_df = votes_df.withColumnRenamed('_c0', 'folder')
split_df = split_df.withColumnRenamed('_c0', 'folder')
split_count = split_df.count()
joined_df = split_df.join(F.broadcast(valid_folders_df), "folder")
joined_df.show()
joined_count = joined_df.count()
print("Before: %d\nAfter: %d" % (split_count, joined_count))

In [0]:
split_count = split_df.count() 
joined_count = joined_df.count()
invalid_df = split_df.join(F.broadcast(joined_df), 'folder', 'left_anti')
invalid_count = invalid_df.count()
print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))

invalid_folder_count = invalid_df.select('folder').distinct().count()
print("%d distinct invalid folders found" % invalid_folder_count)

In [0]:
print(joined_df.select('dog_list').show(10, truncate=False))

DogType = StructType([
StructField("breed", StringType(), False),
StructField("start_x", IntegerType(), False),
StructField("start_y", IntegerType(), False),
StructField("end_x", IntegerType(), False),
StructField("end_y", IntegerType(), False)
])

In [0]:
## dataset not availabe

In [0]:
## dataset not availabe