# Setting up our Schema

Spark can automatically create a schema for CSV files, but ours don't have headings. Let's set this up here:

In [1]:
import datetime
from pyspark.sql.types import StructType, StructField, FloatType, LongType, StringType

feats = []
f = open('features.txt')
for line_num, line in enumerate(f):
    if line_num == 0:
        # Timestamp
        feats.append(StructField(line.strip(), LongType(), True))
    elif line_num == 1:
        # Geohash
        feats.append(StructField(line.strip(), StringType(), True))
    else:
        # Other features
        feats.append(StructField(line.strip(), FloatType(), True))
    
schema = StructType(feats)

print(schema)


StructType(List(StructField(Timestamp,LongType,true),StructField(Geohash,StringType,true),StructField(geopotential_height_lltw,FloatType,true),StructField(water_equiv_of_accum_snow_depth_surface,FloatType,true),StructField(drag_coefficient_surface,FloatType,true),StructField(sensible_heat_net_flux_surface,FloatType,true),StructField(categorical_ice_pellets_yes1_no0_surface,FloatType,true),StructField(visibility_surface,FloatType,true),StructField(number_of_soil_layers_in_root_zone_surface,FloatType,true),StructField(categorical_freezing_rain_yes1_no0_surface,FloatType,true),StructField(pressure_reduced_to_msl_msl,FloatType,true),StructField(upward_short_wave_rad_flux_surface,FloatType,true),StructField(relative_humidity_zerodegc_isotherm,FloatType,true),StructField(categorical_snow_yes1_no0_surface,FloatType,true),StructField(u-component_of_wind_tropopause,FloatType,true),StructField(surface_wind_gust_surface,FloatType,true),StructField(total_cloud_cover_entire_atmosphere,FloatType,tru

# Creating a Dataframe

Let's load our CSV into a 'dataframe' - Spark's abstraction for working with tabular data (built on top of RDDs)

In [2]:
# from pyspark.storagelevel import StorageLevel
# spark.conf.set("spark.sql.broadcastTimeout", 36000)
# spark.conf.set("spark.sql.broadcastTimeout", 1200)
# df = spark.read.format('csv').option('sep', '\t').schema(schema).load('/Volumes/evo/Datasets/NAM_2015_S/*')
# df = spark.read.format('csv').option('sep', '\t').schema(schema).load('hdfs://orion11:37000/nam_tiny.tdv')
# df = spark.read.format('csv').option('sep', '\t').schema(schema).load('hdfs://orion11:37000/data/nam/nam_201509.tdv.gz')
# df = spark.read.format('csv').option('sep', '\t').schema(schema).load('hdfs://orion11:37000/data/nam_s/*')
df = spark.read.format('csv').option('sep', '\t').schema(schema).load('hdfs://orion11:37000/data/nam/*')

# df.cache()
# df.persist(StorageLevel.DISK_ONLY)
df.take(1)

[Row(Timestamp=1430438400000, Geohash='dndf9tz5r8eb', geopotential_height_lltw=1915.593994140625, water_equiv_of_accum_snow_depth_surface=0.0, drag_coefficient_surface=0.0, sensible_heat_net_flux_surface=-12.571273803710938, categorical_ice_pellets_yes1_no0_surface=0.0, visibility_surface=24220.529296875, number_of_soil_layers_in_root_zone_surface=3.0, categorical_freezing_rain_yes1_no0_surface=0.0, pressure_reduced_to_msl_msl=101235.0, upward_short_wave_rad_flux_surface=4.25, relative_humidity_zerodegc_isotherm=95.0, categorical_snow_yes1_no0_surface=0.0, u-component_of_wind_tropopause=20.28228759765625, surface_wind_gust_surface=3.9325132369995117, total_cloud_cover_entire_atmosphere=98.0, upward_long_wave_rad_flux_surface=371.25927734375, land_cover_land1_sea0_surface=1.0, vegitation_type_as_in_sib_surface=10.0, v-component_of_wind_pblri=-3.47259521484375, albedo_surface=17.25, lightning_surface=0.0, ice_cover_ice1_no_ice0_surface=0.0, convective_inhibition_surface=-12.582763671875,

## Analysis

### [2 pt] Travel Startup: After graduating from USF, you found a startup that aims to provide personalized travel itineraries using big data analysis. Given your own personal preferences, build a plan for a year of travel across 5 locations. Or, in other words: pick 5 regions. What is the best time of year to visit them based on the dataset?

In [2]:
dfList = list()
for x in range(1,13):
    if x<10:
        x = '0'+str(x)
    path = "hdfs://orion11:37000/data/nam/nam_2015%s.tdv.gz" %x
#     print(path)
    df_1 = spark.read.format('csv').option('sep', '\t').schema(schema).load(path)
    dfList.append(df_1)
print(len(dfList))

12


In [3]:
start = datetime.datetime.now().replace(microsecond=0)

month = 1
for df_1 in dfList:  
    df_1.createOrReplaceTempView("TRAVEL_DF")

    # F = 9/5(K − 273.15) + 32  68 to 72 °F, so K should be 293.15 to 295.37
    best_time = spark.sql("SELECT Timestamp, Geohash, avg(temperature_surface) as avgtemper, \
                             avg(relative_humidity_zerodegc_isotherm) as avghum FROM TRAVEL_DF \
                                group by Timestamp, Geohash \
                                  having avg(temperature_surface) > 293.15 \
                                     and avg(temperature_surface) < 295.37").collect()
    print('month:', month)
    month = month + 1
    
    count = 0
    for x in best_time:
        f = 9/5 * (x.avgtemper - 273.15) + 32
        c = (f - 32) * 5/9
        THI1 = 1.8 * c - (1 - x.avghum / 100) * (c - 14.3) + 32
    #     print(THI1)
        if count > 4 :
            break
        if THI1 >= 65 and THI1 < 75:
            count = count + 1
            print(x)
            print('THI1 value is:', THI1)

end = datetime.datetime.now().replace(microsecond=0)
print('Job running time:', end - start)

month: 1
Row(Timestamp=1420178400000, Geohash='8vkurbww01b0', avgtemper=293.17681884765625, avghum=48.0)
THI1 value is: 65.07032812500003
Row(Timestamp=1420178400000, Geohash='9usrkvqdgzez', avgtemper=295.30181884765625, avghum=15.0)
THI1 value is: 65.19922790527346
Row(Timestamp=1420178400000, Geohash='8uxh1ktytx00', avgtemper=294.80181884765625, avghum=21.0)
THI1 value is: 65.16533703613283
Row(Timestamp=1420178400000, Geohash='8uz0geczqhpz', avgtemper=294.42681884765625, avghum=27.0)
THI1 value is: 65.20519616699221
Row(Timestamp=1420178400000, Geohash='9kt3kq4je1kp', avgtemper=293.80181884765625, avghum=43.0)
THI1 value is: 65.55273718261722
month: 2
Row(Timestamp=1422748800000, Geohash='dw11jd7dct5b', avgtemper=294.15985107421875, avghum=89.0)
THI1 value is: 69.07964831542972
Row(Timestamp=1422748800000, Geohash='dw4ss4qugy6p', avgtemper=293.78485107421875, avghum=91.0)
THI1 value is: 68.5725953369141
Row(Timestamp=1422748800000, Geohash='9uf5eftyjhpb', avgtemper=294.6598510742187

* Pick the temperature_surface and relative_humidity_zerodegc_isotherm feature
* The Temperature-Humidity Index (THI) was calculated for each month using the formula developed by Kibler (1964):<br> 
from: https://www.researchgate.net/publication/304717353_An_Estimate_of_Thermal_Comfort_in_North-Central_Region_of_Nigeria <br>
THI1 = 1.8 × Ta - (1-RH)(Ta-14.3) + 32
    * Where Ta = average ambient monthly temperature in °C 
    * RH = average monthly relative humidity as a fraction of the unit
![](./images/THI1_index_table.png)<br>
* Most people will feel comfortable at colloquially a range of temperatures around 20 to 22 °C (68 to 72 °F). Thermal comfort: https://en.wikipedia.org/wiki/Thermal_comfort
* The recommended range of indoor relative humidity in air conditioned buildings is generally 30-60%. Relative humidity: https://en.wikipedia.org/wiki/Relative_humidity
* The formula converts a temperature from kelvin K to degrees Fahrenheit F: F = 9/5(K − 273.15) + 32
* The formula converts a temperature from Fahrenheit (°F) to Celsius (°C): C = (F - 32) * 5/9
<br>
<br>

| Best time(month) |    Geohash    | Average Temperature °F|  Average Humidity  |        THI1       |
|:---------------- |:--------------|:---------------------:|:------------------:|:-----------------:|
| April            | 9mpxfu4kdhhp  | 71.3                  | 45.0               | 67.17723297119144 |
| May              | c292jgr2bwxb  | 70.2                  | 43.0               | 66.27486669921878 |
| June             | 9xyjr7462xrz  | 70.2                  | 55.0               | 67.09072753906254 | 
| November         | djt12vdtexb0  | 70.9                  | 40.0               | 66.52801513671878 |
| December         | 9g76dbr175ez  | 70.4                  | 35.0               | 65.82714477539065 |



#### April, Geohash: 9mpxfu4kdhhp, Average Temperature °F: 71.3, Average Humidity: 45.0,  THI1: 67.17723297119144
![](./images/travel_01.png)<br>
#### May, Geohash: c292jgr2bwxb, Average Temperature °F: 70.2, Average Humidity: 43.0,  THI1: 66.27486669921878
![](./images/travel_02.png)<br>
#### June, Geohash: 9xyjr7462xrz, Average Temperature °F: 70.2, Average Humidity: 55.0,  THI1: 67.09072753906254
![](./images/travel_03.png)<br>
#### November, Geohash: djt12vdtexb0, Average Temperature °F: 70.9, Average Humidity: 40.0,  THI1: 66.52801513671878
![](./images/travel_04.png)<br>
#### December, Geohash: 9g76dbr175ez, Average Temperature °F: 70.4, Average Humidity: 35.0,  THI1: 65.82714477539065
![](./images/travel_05.png)<br>

### [2 pt] SolarWind, Inc.: You get bored enjoying the amazing views from your mansion, so you start a new company; here, you want to help power companies plan out the locations of solar and wind farms across North America. Locate the top 3 places for solar and wind farms, as well as a combination of both (solar + wind farm). You will report a total of 9 Geohashes as well as their relevant attributes (for example, cloud cover and wind speeds).

In [3]:
start = datetime.datetime.now().replace(microsecond=0)

df.createOrReplaceTempView("SOLAR_WIND_DF")

print('Top 3 places for solar:')
cloud = spark.sql("SELECT Geohash, avg(total_cloud_cover_entire_atmosphere) as avgcloud, \
                     avg(surface_wind_gust_surface) as avgwind \
                       FROM SOLAR_WIND_DF WHERE land_cover_land1_sea0_surface = 1 \
                         and total_cloud_cover_entire_atmosphere >= 0 \
                           group by Geohash order by avgcloud ASC limit 3").collect()

for c in cloud:
    print(c)

print('Top 3 places for wind farms:')    
wind = spark.sql("SELECT Geohash, avg(total_cloud_cover_entire_atmosphere) as avgcloud, \
                    avg(surface_wind_gust_surface) as avgwind \
                      FROM SOLAR_WIND_DF WHERE land_cover_land1_sea0_surface = 1 \
                        and geopotential_height_surface > 80 \
                          and total_cloud_cover_entire_atmosphere >= 0 \
                            group by Geohash having avgwind > 6.5 order by avgwind DESC limit 3").collect()

for w in wind:
    print(w)

print('Top 3 places for solar and wind farms:')
solar_wind = spark.sql("SELECT Geohash, avg(total_cloud_cover_entire_atmosphere) as avgcloud, \
                          avg(surface_wind_gust_surface) as avgwind \
                            FROM SOLAR_WIND_DF WHERE land_cover_land1_sea0_surface = 1 \
                              and geopotential_height_surface > 80 \
                                and total_cloud_cover_entire_atmosphere >= 0 \
                                  group by Geohash having avgwind > 6.5").collect()


def comparator(e):
    return e.avgwind - e.avgcloud

solar_wind.sort(reverse=True, key=comparator)

count = 0
for sw in solar_wind:
    if count > 2:
        break
    count = count + 1
    print(sw, 'comb_solar_wind:', sw.avgwind - sw.avgcloud)

end = datetime.datetime.now().replace(microsecond=0)
print('Job running time:', end-start)

Top 3 places for solar:
Row(Geohash='9mtz1cve0n5b', avgcloud=13.301871440195281, avgwind=2.864942249701797)
Row(Geohash='9mwj3j02xw0p', avgcloud=13.459723352318958, avgwind=3.1962075028795844)
Row(Geohash='9mtzhuf4cbrz', avgcloud=13.545972335231896, avgwind=2.6938682030719696)
Top 3 places for wind farms:
Row(Geohash='f9pz8wckcbw0', avgcloud=72.79739625711962, avgwind=9.304476408392457)
Row(Geohash='f9pbdppft8d0', avgcloud=82.01952807160293, avgwind=9.286219695969459)
Row(Geohash='f9rb1xc31tzz', avgcloud=73.3393002441009, avgwind=9.269183482989924)
Top 3 places for solar and wind farms:
Row(Geohash='d7jjffc3g57b', avgcloud=19.093572009764035, avgwind=7.262419868791074) comb_solar_wind: -11.83115214097296
Row(Geohash='9mq6w3hry67z', avgcloud=22.372660699755897, avgwind=6.57151263059883) comb_solar_wind: -15.801148069157067
Row(Geohash='d7jjtr7ksvh0', avgcloud=23.687550854353134, avgwind=7.485111079679649) comb_solar_wind: -16.202439774673486
Job running time: 0:35:45


#### Example
* Top 1 place for solar: Geohash='9mtz1cve0n5b', avgcloud=13.301871440195281, avgwind=2.864942249701797
![](./images/solar_01.png)<br>
* Top 1 place for wind farm: Geohash='f9pz8wckcbw0', avgcloud=72.79739625711962, avgwind=9.304476408392457
![](./images/wind_01.png)<br>
* Top 1 place for solar and wind farm: Geohash='d7jjffc3g57b', avgcloud=19.093572009764035, avgwind=7.262419868791074, comb_solar_wind: -11.83115214097296
![](./images/solar_wind_01.png)<br>

## Option 1: Advanced Analysis
### You’ve had the opportunity to analyze two datasets thus far; now it’s time to analyze a dataset of your own. Find a dataset online and use Spark (or Hadoop) to analyze it. You should:

### 1. [0.5 pt] Describe the dataset
* Police Department Incident Reports: Historical 2003 to May 2018 in San Francisco   [source from]: https://data.sfgov.org/Public-Safety/Police-Department-Incident-Reports-Historical-2003/tmnf-yvry
* What's in this Dataset?
    * The .csv file(462.9 MB) contains:   
        * Rows: 2.21M 
        * Columns: 13 
        * Each row is a: Incident Report
* Here are features in the dataset:
    * IncidntNum
    * Category
    * Descript
    * DayOfWeek
    * Date
    * Time
    * PdDistrict
    * Resolution
    * Address
    * X
    * Y
    * Location
    * PdId
* Preview
![](./images/PDIR_dataset.png)<br>

### 2. [0.5 pt] Outline the types of insights you hope to gain from it

### 3. [1 pt] Make hypotheses about what you might find
* Where is the most dangerous area in San Francisco ?
* What kind of incident happened most in San Francisco ?
* List some high incidence areas for 24 hours a day in order to remind people to avoid/pay more attention in those areas in a specific time.
* How many incidents were finally resolved ?
* Are there any reasons that may raise/drop the crime rate? Time? Place? Resolution rate?