In [3]:
#!conda install -c conda-forge faker -y
#!conda install -c conda-forge geopy -y
#!pip install pymysql
#!pip install psycopg2

In [2]:
import json, random
import pandas as pd
from time import time,sleep
from faker import Faker
from geopy import distance

In [41]:
faker = Faker()

# Generate data from faker

In [42]:
data_list = []
for i in range(50):
    data = dict(
        device_id=random.choice(['5390df32-b924-44e9-b416-eb31066741e4','f87141eb-feff-45b8-88a4-0550d58a14ac']),
        temperature=faker.random_int(10, 50),
        location=dict(latitude=str(faker.latitude()), longitude=str(faker.longitude())),
        time=str(int(time()))
    )
    sleep(0.5)
    data_list.append(data)

In [44]:
output = pd.DataFrame()
df_dictionary = pd.DataFrame(data_list)

In [45]:
data_list[0]

{'device_id': '5390df32-b924-44e9-b416-eb31066741e4',
 'temperature': 34,
 'location': {'latitude': '10.7800565', 'longitude': '125.855662'},
 'time': '1693210784'}

In [46]:
df_dictionary = df_dictionary.sort_values(['device_id','time'], ascending = True)
df_dictionary

Unnamed: 0,device_id,temperature,location,time
0,5390df32-b924-44e9-b416-eb31066741e4,34,"{'latitude': '10.7800565', 'longitude': '125.8...",1693210784
1,5390df32-b924-44e9-b416-eb31066741e4,23,"{'latitude': '-51.7737715', 'longitude': '8.45...",1693210785
5,5390df32-b924-44e9-b416-eb31066741e4,33,"{'latitude': '-12.210552', 'longitude': '99.51...",1693210787
7,5390df32-b924-44e9-b416-eb31066741e4,24,"{'latitude': '-6.1952645', 'longitude': '118.7...",1693210788
9,5390df32-b924-44e9-b416-eb31066741e4,10,"{'latitude': '-81.1881545', 'longitude': '-142...",1693210789
10,5390df32-b924-44e9-b416-eb31066741e4,31,"{'latitude': '-42.8248035', 'longitude': '6.31...",1693210789
14,5390df32-b924-44e9-b416-eb31066741e4,45,"{'latitude': '-16.852542', 'longitude': '-67.3...",1693210791
18,5390df32-b924-44e9-b416-eb31066741e4,19,"{'latitude': '29.0324935', 'longitude': '-2.42...",1693210793
20,5390df32-b924-44e9-b416-eb31066741e4,14,"{'latitude': '-57.420107', 'longitude': '-153....",1693210794
22,5390df32-b924-44e9-b416-eb31066741e4,20,"{'latitude': '-31.888895', 'longitude': '161.2...",1693210795


In [47]:
df_dictionary['location_lead'] = df_dictionary.groupby(['device_id'])['location'].shift(-1).fillna(df_dictionary['location'])


In [48]:
df_dictionary.head(51)

Unnamed: 0,device_id,temperature,location,time,location_lead
0,5390df32-b924-44e9-b416-eb31066741e4,34,"{'latitude': '10.7800565', 'longitude': '125.8...",1693210784,"{'latitude': '-51.7737715', 'longitude': '8.45..."
1,5390df32-b924-44e9-b416-eb31066741e4,23,"{'latitude': '-51.7737715', 'longitude': '8.45...",1693210785,"{'latitude': '-12.210552', 'longitude': '99.51..."
5,5390df32-b924-44e9-b416-eb31066741e4,33,"{'latitude': '-12.210552', 'longitude': '99.51...",1693210787,"{'latitude': '-6.1952645', 'longitude': '118.7..."
7,5390df32-b924-44e9-b416-eb31066741e4,24,"{'latitude': '-6.1952645', 'longitude': '118.7...",1693210788,"{'latitude': '-81.1881545', 'longitude': '-142..."
9,5390df32-b924-44e9-b416-eb31066741e4,10,"{'latitude': '-81.1881545', 'longitude': '-142...",1693210789,"{'latitude': '-42.8248035', 'longitude': '6.31..."
10,5390df32-b924-44e9-b416-eb31066741e4,31,"{'latitude': '-42.8248035', 'longitude': '6.31...",1693210789,"{'latitude': '-16.852542', 'longitude': '-67.3..."
14,5390df32-b924-44e9-b416-eb31066741e4,45,"{'latitude': '-16.852542', 'longitude': '-67.3...",1693210791,"{'latitude': '29.0324935', 'longitude': '-2.42..."
18,5390df32-b924-44e9-b416-eb31066741e4,19,"{'latitude': '29.0324935', 'longitude': '-2.42...",1693210793,"{'latitude': '-57.420107', 'longitude': '-153...."
20,5390df32-b924-44e9-b416-eb31066741e4,14,"{'latitude': '-57.420107', 'longitude': '-153....",1693210794,"{'latitude': '-31.888895', 'longitude': '161.2..."
22,5390df32-b924-44e9-b416-eb31066741e4,20,"{'latitude': '-31.888895', 'longitude': '161.2...",1693210795,"{'latitude': '-31.3529895', 'longitude': '119...."


In [49]:
def calculate_distance(location1, location2):
    location1=(float(location1['latitude']),float(location1['longitude']))
    location2=(float(location2['latitude']),float(location2['longitude']))
    miles = distance.distance(location1, location2).miles
    return miles
    

In [50]:
df_dictionary['distance'] = df_dictionary.apply(lambda x: calculate_distance(x.location, x.location_lead), axis=1)

# Tasks

a. The maximum temperatures measured for every device per hours. 

b. The amount of data points aggregated for every device per hours. 

c. Total distance of device movement for every device per hours.

In [53]:
transformed_data = df_dictionary.groupby('device_id').agg(
                        {'temperature':'max',
                         'time':['count','max'],
                         'distance':'sum'}).reset_index()

In [55]:
transformed_data.head()

Unnamed: 0_level_0,device_id,temperature,time,time,distance
Unnamed: 0_level_1,Unnamed: 1_level_1,max,count,max,sum
0,5390df32-b924-44e9-b416-eb31066741e4,50,24,1693210809,130769.171312
1,f87141eb-feff-45b8-88a4-0550d58a14ac,50,26,1693210809,136411.813106


In [68]:
transformed_data.columns = ['device_id','max_temperature','count_data_points','extract_dt','total_distance_km']

In [71]:
transformed_data['extract_dt'] = transformed_data['extract_dt'].apply(lambda x: datetime.utcfromtimestamp(int(x)).strftime('%Y-%m-%d %H:%M:%S'))

In [72]:
transformed_data

Unnamed: 0,device_id,max_temperature,count_data_points,extract_dt,total_distance_km
0,5390df32-b924-44e9-b416-eb31066741e4,50,24,2023-08-28 08:20:09,130769.171312
1,f87141eb-feff-45b8-88a4-0550d58a14ac,50,26,2023-08-28 08:20:09,136411.813106


# Test SQL Engines 

In [76]:
transformed_data.to_sql('devices_agg_data', mysql_engine,if_exists='append', index = False)

2

In [60]:
EXTRACT_DATA_LAST_HOUR_QUERY = """
      select device_id,temperature,location,to_timestamp(time::int) as event_dt
      from devices
      where to_timestamp(time::int) between '{current_timestamp}'::timestamp - interval '59 minutes 59 seconds'
              and '{current_timestamp}'::timestamp order by time
      """

In [14]:
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, BigInteger, DateTime
from sqlalchemy.exc import OperationalError
from datetime import datetime

In [74]:
mysql_engine = create_engine('mysql+pymysql://nonroot:nonroot@localhost/analytics?charset=utf8', pool_pre_ping=True, pool_size=10)

In [23]:
psql_engine = create_engine('postgresql+psycopg2://postgres:password@localhost:5432/main', pool_pre_ping=True, pool_size=10)

In [None]:
extracted_data = pd.read_sql(EXTRACT_DATA_LAST_HOUR_QUERY,psql_engine)
print(extracted_data.shape)

In [26]:
extracted_data.head()

Unnamed: 0,device_id,temperature,location,time
0,9ea0877d-d680-43c5-b1a3-5ac7c08e351e,24,"{""latitude"": ""-70.598631"", ""longitude"": ""-4.41...",1693207674
1,46ba5134-dd09-415e-aaaf-fe5f90231135,32,"{""latitude"": ""-43.4104115"", ""longitude"": ""174....",1693207674
2,25aa87b8-56ed-41f9-b955-706f3bf0f69b,38,"{""latitude"": ""-60.739830"", ""longitude"": ""1.671...",1693207674
3,9ea0877d-d680-43c5-b1a3-5ac7c08e351e,34,"{""latitude"": ""-5.123777"", ""longitude"": ""25.234...",1693207675
4,46ba5134-dd09-415e-aaaf-fe5f90231135,50,"{""latitude"": ""44.061240"", ""longitude"": ""50.889...",1693207675
