#Spark_IA_Assignment
* This data contains time series data of user locations.
* Each observation captures location of the user at the given stamp.
* Assume that person stays at the same place until you get the next observation from the user.
* Design a system that processes this data and computes average number of devices within a 200 m radius for every 10 minute time window.

#Describe your design considerations and assumptions.
## Understanding of the given Task.
* We have data of user's with location, timestamp, deviceID (assuming same device doesn't have multiple deviceID) and etc variables.
* Assuming as mentioned that person/device stays in the same place, until we get updated observation wrt to that person/device.
* We have to calculate the average number of devices within 200 meter radius.
* Example :- If device A, device B has 10,20 number of devices within 200 meter respectively, the average number of devices within 200 meter radius would be ((10+20)÷2) = 15.
* Finally, for every 10 minutes we have to calculate average number of devices within 200 meter radius.

##Platform used.
* I have used databricks community edition while working and executing throughout task as databricks has IPython notebooks and easy to use single cluster.

##Creating Spark Context

In [0]:
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
conf = SparkConf().setAppName("IA_Spark_Assignment")
sc = SparkContext.getOrCreate(conf = conf)

##Importing required packages and defining the schema

In [0]:
from pyspark.sql.types import *

inputPath = "/FileStore/tables/Raw_Data/"

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
csvSchema = StructType([StructField('Location_Index', StringType()),
                     StructField('Device_ID', StringType()),
                     StructField('Latitude', DoubleType()),
                     StructField('Longitude', DoubleType()),
                     StructField('Date', DateType()),
                     StructField('Network', StringType()),
                     StructField('Time_Stamp', TimestampType()),
                     StructField('Pin_Code', IntegerType()),
                     StructField('City', StringType())])

##DataFrame representing data in the CSV files
* This will read all the csv files present in the given inputPath.

In [0]:
staticInputDF = (
  spark
    .read
    .schema(csvSchema)
    .csv(inputPath)
)
staticInputDF.show(5)

## Calculate distance function.
* Defining a python function to calculate distance between two coordinates when latitude, longitude of coordinates are given.
* Converting the python function into spark UDF (user defined function).
* Reference :- https://www.geeksforgeeks.org/program-distance-two-points-earth/

In [0]:
from math import radians, cos, sin, asin, sqrt
def device_distance(lat1, lon1, lat2, lon2):
     
    # The math module contains a function named radians which converts from degrees to radians.
    lon1 = radians(lon1)
    lon2 = radians(lon2)
    lat1 = radians(lat1)
    lat2 = radians(lat2)
      
    # Haversine formula
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
 
    c = 2 * asin(sqrt(a))
    
    # Radius of earth in kilometers.
    r = 6371
      
    # calculate the result
    return round((c * r)*1000)
distance_UDF = udf(lambda l1,ln1,l2,ln2: device_distance(l1,ln1,l2,ln2), IntegerType())
spark.udf.register("distance_UDF", distance_UDF)

##Sorting Dataframe and removing duplicates
* Sorting the Dataframe wrt to timestamp in order to consider the latest location (coordinates) of the device.
* Removing duplicates wrt "DeviceID" variable, assuming same device doesn't have multiple deviceID, in order to remove redundancy rows from dataframe

In [0]:
Unique_rows_by_DeviceID = (staticInputDF.orderBy(staticInputDF.Time_Stamp.desc()).dropDuplicates(["Device_ID"]))

## Creating a temporary table of "Unique_rows_by_DeviceID"

In [0]:
Unique_rows_by_DeviceID.orderBy(Unique_rows_by_DeviceID.Time_Stamp.desc()).createOrReplaceTempView("Device_raw_table")

##Querying on the temporary table using SQL.
* Which gives us dataframe with 2 columns
## Columns summary :-
* 'deviceID' - ID associated with device,
* 'nearbyCount' - count of nearby devices within 200m radius for respective deviceID.

In [0]:
s1 = """ 
select D_ID1, count(1) nearbyCount from (
select t1.Device_ID D_ID1,t2.Device_ID D_ID2, distance_UDF(t1.Latitude,t1.Longitude,t2.Latitude,t2.Longitude) as D_UDF
from Device_raw_table t1, Device_raw_table t2
where t1.Device_ID != t2.Device_ID )
where D_UDF <= 200
group by D_ID1
"""
device_nearbyCount = spark.sql(s1)

## Calculating the overall average of the dataframe.

In [0]:
counts = device_nearbyCount.agg({'nearbyCount': 'avg'})
display(counts)

avg(nearbyCount)
203.60722668753112


##Conlusion
* In the given data, the average number of devices which are present within 200m radius are 203.

## Other reference followed for Spark Dataframe transformations.
* https://www.udemy.com/course/pyspark-aws-master-big-data-with-pyspark-and-aws/

In [0]:
counts.write.csv('avg_results.csv')