In [0]:
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import concat, col, lit, to_timestamp, to_date, hour

print("Setup complete")

In [0]:
df_traffic = spark.read.option("header","true").csv("/FileStore/tables/2015_2020.csv").drop('_c0')
df_traffic = df_traffic.filter(df_traffic["1"]=='BEOGRAD')
df_traffic = df_traffic.select(col("0").alias("id"),col("1").alias("city"),col("2").alias("municipality")\
                               ,col("3").alias("timestamp"),col("4").alias("latitude"),col("5").alias("longitude")\
                              ,col("6").alias("outcome"),col("7").alias("participants"),col("8").alias("description"))
display(df_traffic)

id,city,municipality,timestamp,latitude,longitude,outcome,participants,description
1106801,BEOGRAD,BARAJEVO,"01.10.2015,12:05",20.4180857555,44.5788977355,Sa mat.stetom,SN SA JEDNIM VOZILOM,Nezgode sa učešćem jednog vozila i preprekama na ili iznad kolovoza
1106289,BEOGRAD,BARAJEVO,"03.10.2015,12:00",20.4134764628,44.5927879951,Sa povredjenim,SN SA NAJMANjE DVA VOZILA – BEZ SKRETANjA,Najmanje dva vozila koja se kreću u istom smeru – sustizanje
1106903,BEOGRAD,BARAJEVO,"08.10.2015,20:00",20.3885608869,44.5544174799,Sa mat.stetom,SN SA JEDNIM VOZILOM,Nezgoda sa jednim vozilom – silazak sa kolovoza u krivini
1106338,BEOGRAD,BARAJEVO,"11.10.2015,20:40",20.4472969411,44.6445427093,Sa mat.stetom,SN SA JEDNIM VOZILOM,Nezgode sa učešćem jednog vozila i preprekama na ili iznad kolovoza
1106747,BEOGRAD,BARAJEVO,"14.10.2015,19:10",20.412899,44.577052,Sa mat.stetom,SN SA NAJMANjE DVA VOZILA – BEZ SKRETANjA,Ostale nezgode sa učešćem najmanje dva vozila bez skretanja (bez podataka o smeru)
1106725,BEOGRAD,BARAJEVO,"14.10.2015,19:15",20.437784,44.646534,Sa mat.stetom,SN SA NAJMANjE DVA VOZILA – BEZ SKRETANjA,Ostale nezgode sa učešćem najmanje dva vozila bez skretanja (bez podataka o smeru)
1106421,BEOGRAD,BARAJEVO,"17.10.2015,20:30",20.308034,44.593901,Sa mat.stetom,SN SA PARKIRANIM VOZILIMA,Sudar sa parkiranim vozilom sa desne strane kolovoza
1107460,BEOGRAD,BARAJEVO,"17.10.2015,12:00",20.31733,44.577778,Sa mat.stetom,SN SA NAJMANjE DVA VOZILA – BEZ SKRETANjA,Najmanje dva vozila koja se kreću u istom smeru – preticanje
1107463,BEOGRAD,BARAJEVO,"25.10.2015,16:30",20.2964159473,44.5908153184,Sa mat.stetom,SN SA NAJMANjE DVA VOZILA – BEZ SKRETANjA,Ostale nezgode sa najmanje dva vozila – suprotni smerovi bez skretanja
1108118,BEOGRAD,BARAJEVO,"25.10.2015,01:20",20.4099177149,44.618464171,Sa povredjenim,SN SA NAJMANjE DVA VOZILA – BEZ SKRETANjA,Najmanje dva vozila koja se kreću u istom smeru – sustizanje


In [0]:
replaces = [("Sa poginulim","death"),("Sa povredjenim","injury"),("Sa mat.stetom","damage")]
for exp,replace_exp in replaces:
  df_traffic = df_traffic.withColumn("outcome",regexp_replace(col("outcome"),exp,replace_exp).alias("outcome"))

display(df_traffic.select("outcome").distinct())


outcome
death
injury
damage


In [0]:
replaces = [("PARK","parked vehicle"),("NAJMANjE DVA","two or more vehicles"),("JEDNIM","one vehicle"),("ACIMA","pedestrian")]
for exp,replace_exp in replaces:
  exp = ".*("+exp+").*"
  df_traffic = df_traffic.withColumn("participants",regexp_replace(col("participants"),exp,replace_exp).alias("participants"))

display(df_traffic.select("participants").distinct())
df_traffic.show(5)

participants
pedestrian
one vehicle
parked vehicle
two or more vehicles


In [0]:
df_traffic = df_traffic.withColumn("timestamp", to_timestamp(df_traffic.timestamp, 'dd.MM.yyyy,HH:mm'))
df_traffic = df_traffic.withColumn("latitude", col("latitude").cast("double"))
df_traffic = df_traffic.withColumn("longitude", col("longitude").cast("double"))
df_traffic.printSchema()
df_traffic.show(5)

In [0]:
df_traffic.registerTempTable("accidents")
df_traffic = spark.sql("""select id, city, municipality, timestamp,
                  case
                    when hour(timestamp)>=4 and hour(timestamp)<7 then "early morning"
                    when hour(timestamp)>=7 and hour(timestamp)<11 then "morning"
                    when hour(timestamp)>=11 and hour(timestamp)<14 then "midday"
                    when hour(timestamp)>=14 and hour(timestamp)<18 then "afternoon"
                    when hour(timestamp)>=18 and hour(timestamp)<20 then "early evening"
                    when hour(timestamp)>=20 and hour(timestamp)<23 then "evening"
                    when hour(timestamp)>=23 or hour(timestamp)<1 then "midnight"
                    else "after midnight"
                  end as day_time,
                  latitude, longitude, outcome, participants, description
                  from accidents """)
df_traffic.registerTempTable("accidents")
df_traffic.show(10)
df_traffic.printSchema()

In [0]:
df_summary = spark.read.format('csv').option('header','true').load('/FileStore/tables/sum.csv')
df_observations = spark.read.format('csv').option('header','true').load('/FileStore/tables/obs.csv')

df_summary.printSchema()
df_observations.printSchema()

In [0]:
df_summary = df_summary.withColumn('date', to_date(df_summary.date)).drop('_c0').distinct()
  
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
df_observations = df_observations.select(to_timestamp(col('timestamp')).alias('timestamp')\
                                                                 ,col('temperature'),col('humidity'),col('wind'),col('pressure'),col('condition'))

df_observations.show(5)
df_summary.show(5)

In [0]:
df = df_observations
df = df.select(df.timestamp, df.temperature.cast("double"),df.humidity.cast("double"),df.pressure.cast("double"),df.condition)
df.show(3)
df_observations = df

df = df_summary
df = df.select(df.date, df['high temp'].cast("double"),df['high temp historic'].cast("double"),\
               df['low temp'].cast("double"),df['low temp historic'].cast("double"),df['average'].cast("double"),\
               df['average historic'].cast("double"),df['max wind'].cast("double"),df['pressure'].cast("double"),df.sunrise,df.sunset)
df = df.select(df.date, df['high temp'].alias('high'), df['high temp historic'].alias('high_historic'), df['low temp'].alias('low'),\
              df['low temp historic'].alias('low_historic'), df.average, df['average historic'].alias('average_historic'),\
               df['max wind'].alias('max_wind'), df.pressure,df.sunrise,df.sunset)
df.show(3)
df_summary = df

In [0]:
df_summary.registerTempTable("summary")
df_observations.registerTempTable("observations")

In [0]:
%sql
drop table acc_obs;
create table acc_obs
select a.id, a.municipality, a.timestamp as accident, a.day_time, a.latitude, a.longitude, a.participants, a.outcome, w.timestamp as observation, w.condition, w.temperature, w.humidity, w.pressure
from accidents as a
inner join observations as w
on (date(a.timestamp)=date(w.timestamp) and (
(hour(a.timestamp)=hour(w.timestamp) and (
(hour(a.timestamp)=23 and minute(a.timestamp)>=30 and minute(w.timestamp)>=30) or
(minute(a.timestamp)<=15 and minute(w.timestamp)<=15) or 
(minute(a.timestamp)>15 and minute(a.timestamp)<45 and minute(w.timestamp)>15 and minute(w.timestamp)<45))) or
(minute(a.timestamp)>=45 and minute(w.timestamp)<15 and hour(w.timestamp)-hour(a.timestamp)=1)))

In [0]:
%scala
import com.mongodb.spark._
import spark.implicits._
import org.apache.spark.sql.functions._

val df_a = table("accidents")
val df_s = table("summary")
val df_o = table("observations")
val df_ao = table("acc_obs")
df_a.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").option("database", "belgrade_15-20").option("collection", "accidents").save()
df_s.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").option("database", "belgrade_15-20").option("collection", "summary").save()
df_o.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").option("database", "belgrade_15-20").option("collection", "observations").save()
df_ao.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").option("database", "belgrade_15-20").option("collection", "accidents_observations").save()