# Домашнее задание BigData School Самошина Андрея
**Дано:** логи пользователей с GPS-данными.

Необходимо провести чистку данных, выбрать только тех, у кого размеченые логи и предсказать часть логов. Это задача мульти-класс классификация, потому что одному логу можно сопоставить только однку метку какого-либо класса.

Работа выполнена частично локально (чистка и структурирование данных), остальная работа уже с использованием blob storage.

## Работа с входными данными

После проведения анализа данных было замечено пару особенностей:

*   Не все пользователи имеют разметку таргета (мы их не будем учитывать)
*   Даже у тех, у кого есть разметка, не всегда совпадает с количеством логов (тоже нужно отфильтровать)

Скрипты были написаны локально, потому в этом ноутбуке презентую только их вид и описание. 



### Convert Labels to .csv

Размеченные таргеты в исходном формате предоставлены в формате .txt, что достаточно неудобно. Сконвертируем их в csv. Проходим по всем файлам в папках и если встречается файл с названием labels, то конвертируем его.

In [None]:
import csv, os, sys

def convertToCSV(root, direc):

    with open(root + '/' + direc + '/' + 'labels.txt','rt') as fin:
        cr = csv.reader(fin, delimiter='\t')
        filecontents = [line for line in cr]
    with open(root + '/' + direc + '/' + 'labels_csv.csv','w') as fou:
    	cw = csv.writer(fou, lineterminator='\n')
    	cw.writerows(filecontents)


thedir = "D:\\Big_Data_School_5\DataFrame\Lesson_2\Geolife_Trajectories_1_3\Data_copy"

dirs = [name for name in os.listdir(thedir) if os.path.isdir(os.path.join(thedir, name))]

for direc in dirs:
	if 'labels.txt' in os.listdir(thedir + '/' + direc):
		convertToCSV(thedir, direc)

### Preprocessing logs

Логи в исходном формате тоже в неудобном формате, к тому же первые 6 строк содержат чисто техническую информацию. Обработаем это и также добавим названия колонок для дальнейней работы с ними.

In [None]:
import csv, os, sys

colNames = ['Latitude','Longitude','AllZero','Altitude','NumberOfDays','Date','Time']

def cleanPlt(root, direc1, direc2, colNames):

	# read plt file
	with open(root + '/' + direc1 + '/Trajectory/' + direc2,'rt') as fin:
		cr = csv.reader(fin)
		filecontents = [line for line in cr][6:]
		filecontents.insert(0, colNames)

	# write csv file without header
	with open(root +'/' + direc1 + '/Trajectory/' + direc2[:-4] + '.csv','w') as fou:
		cw = csv.writer(fou, lineterminator='\n')
		cw.writerows(filecontents)

	os.remove(root + '/' + direc1 + '/Trajectory/' + direc2)

thedir = "D:\\Big_Data_School_5\DataFrame\Lesson_2\Geolife_Trajectories_1_3\Data_copy"

dirs = [name for name in os.listdir(thedir) if os.path.isdir(os.path.join(thedir, name))]

for direc in dirs:
    if 'labels_csv.csv' in os.listdir(thedir + '/' + direc):
       	print('Cleaning:', direc)
       	tempdirs = os.listdir(thedir + '/' + direc + '/Trajectory')
       	subdirs = []
       	for item in tempdirs:
       		if not item.endswith('.DS_Store'):
       			subdirs.append(item)
       	for subdir in subdirs:
       		cleanPlt(thedir, direc, subdir, colNames)

### Detect start times in labels

Идея заключается в том, что б обрезать логи только до того времени, которое есть в размеченом датасете, а остальные данные пропустить. Я посчитал, что таким образом потеряется порядка 7% данных с разметкой, что сравнительно немного. В результате секономим довольно много времени, а на количестве данных для обучения это не сильно скажется.

In [None]:
import csv, os, sys

def cleanLabels(root, direc):

	with open(root + '/' + direc + '/labels_csv.csv','rt') as fin:
		cr = csv.reader(fin)
		labelContents = [line for line in cr]
        #print(labelContents)
		startTimes = [entry[0] for entry in labelContents[1:]]
		startLabels = [time.replace('/','').replace(' ','').replace(':','') + '.csv' for time in startTimes]

	trajs = os.listdir(root + '/' + direc + '/Trajectory')

	for traj in trajs:
		if traj not in startLabels:
            
			os.remove(root + '/' + direc + '/Trajectory/' + traj)

	filecontents = [labelContents[0]] #the new labels.csv to be written

	for i in range(len(startLabels)):
		if startLabels[i] in trajs:
			filecontents.append(labelContents[i+1])	

	os.remove(root + '/' + direc + '/labels_csv.csv')

	with open(root + '/' + direc + '/labels_csv.csv','w') as fou:
		cw = csv.writer(fou, lineterminator='\n')
		cw.writerows(filecontents)

thedir = "D:\\Big_Data_School_5\DataFrame\Lesson_2\Geolife_Trajectories_1_3\Data_copy"

dirs = [name for name in os.listdir(thedir) if os.path.isdir(os.path.join(thedir, name))]

for direc in dirs:
    if 'labels_csv.csv' in os.listdir(thedir + '/' + direc):
        cleanLabels(thedir, direc)

### Sorted data

Осталось только отсортировать логи, которые имеют разметку и сопоставляются по времени фиксации данных в разметке, остальные логи удалить. А также перенести в логи метку таргета по соотвествующему промежутку времени в файле labels.

In [None]:
import csv, os, sys
from os.path import isfile, join
import pandas as pd
import numpy as np
import shutil


def labelTraj(root, direc):

	ldf = pd.read_csv(root + '/' + direc + '/labels_csv.csv')
	ldf['Start Time'], ldf['End Time']  = pd.to_datetime(ldf['Start Time']), pd.to_datetime(ldf['End Time'])
	
	trajs = os.listdir(root + '/' + direc + '/Trajectory')
	trajs = [traj for traj in trajs if traj != '.DS_Store']

	os.makedirs(root + '/' + direc + '/Trajectory/Labelled/')
	os.makedirs(root + '/' + direc + '/Trajectory/Unlabelled/')

	labelled = []
	unlabelled= []

	i = 0

	for index, row in ldf.iterrows():

		if i == len(trajs):
			i = 0

		while i < len(trajs):

			traj = trajs[i]

			tdf = pd.read_csv(root + '/' + direc + '/Trajectory/' + traj)
			tdf['datetime'] = tdf.Date + ' ' + tdf.Time
			tdf['datetime'] = pd.to_datetime(tdf.datetime)

			premask = (tdf['datetime'] <= ldf['Start Time'].iloc[index])
			preSubTraj = tdf[premask].copy()

			mask = (tdf['datetime'] <= ldf['End Time'].iloc[index]) & (tdf['datetime'] >= ldf['Start Time'].iloc[index])
			subTraj = tdf[mask].copy()

			if len(subTraj) != 0:
				subTraj['Transportation Mode'] = str(ldf['Transportation Mode'].iloc[index])
				name = str(subTraj['datetime'].iloc[0]).replace('-','').replace(' ','').replace(':','') + '.csv'
				subTraj.to_csv(root + '/' + direc + '/Trajectory/Labelled/' + name, index=False)
				labelled.append(name)

				if len(preSubTraj) != 0:
					name = str(preSubTraj['datetime'].iloc[0]).replace('-','').replace(' ','').replace(':','') + '.csv'
					if name not in labelled and name not in unlabelled:
						preSubTraj['Transportation Mode'] = '-'
						preSubTraj.to_csv(root + '/' + direc + '/Trajectory/Unlabelled/' + name, index=False)
						unlabelled.append(name)

				if index + 1 == len(ldf):
					startIndex = subTraj.index[-1] + 1
					if startIndex > tdf.index[-1]:
						break
					nolabelSubTraj = tdf[startIndex:].copy()
					nolabelSubTraj['Transportation Mode'] = '-'
					name = str(tdf['datetime'].iloc[startIndex]).replace('-','').replace(' ','').replace(':','') + '.csv'
					nolabelSubTraj.to_csv(root + '/' + direc + '/Trajectory/Unlabelled/' + name, index=False)
					unlabelled.append(name)
					break

				if ldf['Start Time'].iloc[index+1] <= tdf['datetime'].iloc[-1]:
					break

				else:
					startIndex = subTraj.index[-1] + 1
					if startIndex > tdf.index[-1]:
						i += 1
						continue 
					nolabelSubTraj = tdf[startIndex:].copy()
					nolabelSubTraj['Transportation Mode'] = '-'
					name = str(tdf['datetime'].iloc[startIndex]).replace('-','').replace(' ','').replace(':','') + '.csv'
					nolabelSubTraj.to_csv(root + '/' + direc + '/Trajectory/Unlabelled/' + name, index=False)
					unlabelled.append(name)
					i += 1
					continue

			if index + 1 == len(ldf):
				break			

			elif ldf['Start Time'].iloc[index + 1] > tdf['datetime'].iloc[-1] and traj not in unlabelled:
				tdf['Transportation Mode'] = '-'
				tdf.to_csv(root + '/' + direc + '/Trajectory/Unlabelled/' + traj, index=False)
				unlabelled.append(traj)
				i += 1
				continue

			if len(subTraj) != 0:
				if subTraj.index[-1] == tdf.index[-1]:
					i += 1

			else:
				break

		else:
			continue

def removeOriginals(root, direc):

	files = [name for name in os.listdir(thedir + '/' + direc + '/Trajectory') if os.path.isfile(os.path.join(thedir + '/' + direc + '/Trajectory', name))]
	files = [file for file in files if file != '.DS_Store']

	for file in files:
		os.remove(root + '/' + direc + '/Trajectory/' + file)

thedir = "D:\\Big_Data_School_5\DataFrame\Lesson_2\Geolife_Trajectories_1_3\Data_copy"

dirs = [name for name in os.listdir(thedir) if os.path.isdir(os.path.join(thedir, name))]

for direc in dirs:
	
    if 'labels_csv.csv' in os.listdir(thedir + '/' + direc):
        print('Labels User:', direc) 
        shutil.move(thedir+ '/' + direc + '/Trajectory/Labelled', thedir+ '/' + direc + '/Traj_Labelled')
        shutil.rmtree(thedir+ '/' + direc + '/Trajectory')
    else:
        print('Unlabels User:', direc)
        shutil.rmtree(thedir+ '/' + direc)

### Upload to Blob

Осталось все полученное загрузить в блоб. 

In [None]:
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient,PublicAccess
import os


def upload_to_blob(r, path_remove, file, container):
    file_path_on_azure = os.path.join(r,file).replace(path_remove,"")
    file_path_on_local = os.path.join(r,file)
    #print(file_path_on_local)

    blob_client = container.get_blob_client(file_path_on_azure)

    with open(file_path_on_local,'rb') as data:
        blob_client.upload_blob(data)

def run_sample():    
    conn_str="DefaultEndpointsProtocol=https;AccountName=mlstorageall;AccountKey=djvx9juh6EJZWqx7T3U2D61BwhPWyEx4BcVyzbt/AQhaNR8hGmVBtzaJnbbG2w4c67p4dZkZIAWK6kq1PbV8Zg==;EndpointSuffix=core.windows.net"
    container_name="samoshyn"    
    
    path_remove = "D:\\Big_Data_School_5\DataFrame\Lesson_2\Geolife_Trajectories_1_3\\"
    local_path = "D:\\Big_Data_School_5\DataFrame\Lesson_2\Geolife_Trajectories_1_3\AllData_preprocess" #the local folder

    service_client=BlobServiceClient.from_connection_string(conn_str)
    container_client = service_client.get_container_client(container_name)  
    
    file_check = 0
    file = None
    total = 0
    for r,d,f in os.walk(local_path):
        
        if file_check==1:
            #print(r)
            total = total + 1
            for file in f:
                #print(os.path.join(r,file))
                upload_to_blob(r, path_remove, file, container_client)
            print('Upload user:', total)
            file_check = 0
        if f:
            
            for file in f:
               #print(file)    
               if file=='labels_csv.csv':
                    #print(os.path.join(r,file))
                    upload_to_blob(r, path_remove, file, container_client)
                    file_check = 1
    print('----Total users:', total)


run_sample()

На выходе получаем все данные, загруженные в блоб сторедж с сохранением иерархичности папок.

![1](https://drive.google.com/uc?export=view&id=1Q0LYBPWx5c9ScuVpilehj5ErtcbPMXr2)

**Заметка:** *К сожалению, у меня не получилось прочитать в Google Colab файлы из стореджа (при аналогичной операции в Датабриксе проблем нету), хотя пройтись по ним я могу без проблем, об этом ниже. И поскольку я работал локально, то использовал загруженный архив этих же почищенных данных.*

## Установка Spark и инициализация

По техническим причинам Датабрикс недоступен длительное время, потому я решил работать в Google Colab (community version Датабрикс был довольно слабый). При таком раскладе я не могу работать над дополнительным заданием в виде считывания данных потоками, но лучше сделать костяк и в случае чего доделать в другой среде. Дополнительно нужно было установить Spark.

In [1]:
# Run below commands in google colab
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark3.0.0
!wget -q http://apache.osuosl.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
# unzip it
!tar xf spark-3.0.1-bin-hadoop3.2.tgz
# install findspark 
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
!pip install azure-storage-blob

## Подключаем сторедж

In [None]:
!pip install azure-storage

In [6]:
storage_account_name = 'mlstorageall'
storage_account_access_key = 'djvx9juh6EJZWqx7T3U2D61BwhPWyEx4BcVyzbt/AQhaNR8hGmVBtzaJnbbG2w4c67p4dZkZIAWK6kq1PbV8Zg=='
spark.conf.set('fs.azure.account.key.' + storage_account_name + '.blob.core.windows.net', storage_account_access_key)

In [7]:
blob_container = 'samoshyn'
#general_folder = 'TestFolder'
general_folder = 'AllData_preprocess'
filePath = "wasbs://" + blob_container + "@" + storage_account_name + ".blob.core.windows.net/" + general_folder

Проверяем, что мы подключились куда нужно и выведем пути всех нужных логов.

In [33]:
import azure.storage
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
import os

connect_str = "DefaultEndpointsProtocol=https;AccountName=mlstorageall;AccountKey=djvx9juh6EJZWqx7T3U2D61BwhPWyEx4BcVyzbt/AQhaNR8hGmVBtzaJnbbG2w4c67p4dZkZIAWK6kq1PbV8Zg==;EndpointSuffix=core.windows.net"
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
container_client=blob_service_client.get_container_client(blob_container)

blob_list = container_client.list_blobs(name_starts_with="AllData_preprocess/")
for blob in blob_list:
  path_list = blob.name.split(os.sep)
  #print(path_list)
  #print(blob.name)

Дальше работаем с данными с архива о котором я упомянул выше.

In [10]:
#!unzip -q /content/TestFolder.zip
!unzip -q /content/AllData_preprocess.zip

## Предобработка данных

Ниже представлены все функции, которые я использовал для работы с данными и их применение на одном логе для эффективной траты времени. Дальше будем масштабировать на всех юзеров.

Для начала правильно считаем данные и выведем полученный датафрейм. Я сразу удалил несколько столбцов ('Date', 'Time', 'AllZero', 'NumberOfDays') и заменил их одной фичей datetime.

In [11]:
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, FloatType, DataType, DateType
from pyspark.sql.window import Window
from pyspark.sql.functions import *
import numpy as np
import math
import pyspark.sql.types as t
from itertools import chain

#filePath = 'TestFolder'
filePath = 'AllData_preprocess'
def_path = '/010/Traj_Labelled/20080403160000.csv'

geoSchema = StructType([ \
      StructField("Latitude", FloatType(), True), \
      StructField("Longitude", FloatType(), True), \
      StructField("AllZero", IntegerType(), True), \
      StructField("Altitude", IntegerType(), True), \
      StructField("NumberOfDays", FloatType(), True), \
      StructField("Date", DateType(), True), \
      StructField("Time", StringType(), True), \
      StructField("datetime", StringType(), True), \
      StructField("Transportation Mode", StringType(), True)])

def readcsv(path, schema):

  infer_schema = "false"
  first_row_is_header = "true"
  delimiter = ","
  file_type = "csv"

  df = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .option("sep", delimiter) \
    .schema(geoSchema) \
    .load(filePath+path)

  drop_list = ['Date', 'Time', 'AllZero', 'NumberOfDays']
  df = df.select([column for column in df.columns if column not in drop_list])
  
  return df

df_expample = readcsv(def_path, geoSchema)
df_expample.show(5)

+---------+---------+--------+-------------------+-------------------+
| Latitude|Longitude|Altitude|           datetime|Transportation Mode|
+---------+---------+--------+-------------------+-------------------+
|41.765053| 83.34479|    -777|2008-04-03 16:00:00|              train|
|41.765114|83.345116|    -777|2008-04-03 16:00:01|              train|
|41.765175| 83.34545|    -777|2008-04-03 16:00:02|              train|
| 41.76523|83.345795|    -777|2008-04-03 16:00:03|              train|
| 41.76528| 83.34614|    -777|2008-04-03 16:00:04|              train|
+---------+---------+--------+-------------------+-------------------+
only showing top 5 rows



Вспомогательные функции для генерации фич: переход в Декартову систему координат через сферические, поиск расстояния между двумя точками и поиск угла смещения.

In [12]:
def get_cartesian(lat,lon):
    lat = lat*math.pi/180
    lon = lon*math.pi/180
    R = 6371000
    x = R * math.cos(lat) * math.cos(lon)
    y = R * math.cos(lat) * math.sin(lon)
    return t.Row('Out1', 'Out2')(x,y)
  
def dist(long_x, lat_x, long_y, lat_y):
  if long_y==None or lat_y==None:
    return float(0.01) 
  dx = math.pow((long_x-long_y),2)
  dy =  math.pow((lat_x-lat_y),2)
  return  math.pow((dx+dy), 0.5)

def angle(x0, x1, y0, y1):
  if (x0==0 and x1==0) or (y0==0 and y1==0):
    return float(0.01) 
  lenx =  math.pow(( math.pow(x0,2) + math.pow(x1,2)), 0.5)
  leny =  math.pow(( math.pow(y0,2) + math.pow(y1,2)), 0.5)
  
  return math.acos((x0 * y0 + x1 * y1)/ (lenx * leny))

**Теперь приступаем к генерации фич:** 

*   Для начала, переведем координаты в декартовы, что б можно было легко считать расстояния **(X_coord, Y_coord)**.
*   Дальше считаем длины между двумя соседними кординатами (в пределах одного лога) путем взятия так называемого лага (сдвига) нашей координаты и дальнейшей передачи этих точек в функцию для поиска длины. Получим расстояние, которое преодолел юзер в промежуток времени [Tn, Tn+1] **(Dist)**.
*   Аналогично считаем продолжительнось промежутка времени [Tn, Tn+1].
*   И ищем **скорость** путем деления Dist на полученное время.
*   (Протестировано) Существенного приоста следующая фича не дала, но для отчетности добавлю: ищем **угол смещения** с точки А в точку Б. Т.е. насколько у нас изменилось направление траектории. По-идее, будет влиять на определение самолетов и метро(они в основном не сильно петляют), но не помогло. Потому для экономии времени я её не использовал.



In [13]:
def FeatureEng(df):
  
  schema = StructType([
    StructField("X_coord", FloatType(), False),
    StructField("Y_coord", FloatType(), False)])
    
  get_cartesian_udf = udf(lambda x,y: get_cartesian(x,y), schema)
  df = df.withColumn("Output", get_cartesian_udf('Latitude', 'Longitude'))
  df = df.select("Latitude", "Longitude", "Altitude", "datetime", "Transportation Mode", "Output.*")

  get_dist_udf = udf(lambda x,y,z,g: dist(x,y,z,g), FloatType())
  df = df.withColumn("Dist", get_dist_udf(
      "X_coord", "Y_coord",
      lag("X_coord", 1).over(Window().orderBy("datetime")), lag("Y_coord", 1).over(Window().orderBy("datetime"))))
  
  df = df.withColumn("lag_time", lag("datetime", 1).over(Window().orderBy("datetime"))).fillna({'lag_time': df.head().datetime})
  timeFmt = "yyyy-MM-dd' 'HH:mm:ss"
  timeDiff = (unix_timestamp('datetime', format=timeFmt) - unix_timestamp('lag_time', format=timeFmt))
  df = df.withColumn("Duration", timeDiff).drop('lag_time')
  df = df.withColumn("Speed", col("Dist")/col("Duration")).fillna({'Speed': 0.01})
  
  '''get_angle_udf = udf(lambda x,y,z,g: angle(x,y,z,g), FloatType())
  df = df.withColumn("prev_X", lag("X_coord", 1).over(Window().orderBy("datetime")))
  df = df.withColumn("prev_Y", lag("Y_coord", 1).over(Window().orderBy("datetime")))
  df = df.withColumn("dx", when(isnull(col('X_coord') - col('prev_X')), 0.01)
                                .otherwise(col('X_coord') - col('prev_X'))).drop('prev_X')
  df = df.withColumn("dy", when(isnull(col('Y_coord') - col('prev_Y')), 0.01)
                                .otherwise(col('Y_coord') - col('prev_Y'))).drop('prev_Y')
  
  df = df.withColumn("prev_dx", lag("dx", 1).over(Window().orderBy("datetime")))
  df = df.withColumn("prev_dy", lag("dy", 1).over(Window().orderBy("datetime")))
  df = df.withColumn("prev_dx", when(isnull(col('prev_dx')), 0.01)
                                .otherwise(col('prev_dx')))
  df = df.withColumn("prev_dy", when(isnull(col('prev_dy')), 0.01)
                                .otherwise(col('prev_dy')))
  
  df = df.withColumn("Angle", get_angle_udf("dx", "prev_dx", "dy", "prev_dy")).drop('dx').drop('dy').drop('prev_dx').drop('prev_dy')
  df = df.withColumn("DistByAngle", when(isnull(col('Dist') / col('Angle')), 0.01)
                                .otherwise(col('Dist') / col('Angle')))'''
  
  
  return df 



temp = FeatureEng(df_expample)
temp.show(50)

+---------+---------+--------+-------------------+-------------------+---------+---------+---------+--------+------------------+
| Latitude|Longitude|Altitude|           datetime|Transportation Mode|  X_coord|  Y_coord|     Dist|Duration|             Speed|
+---------+---------+--------+-------------------+-------------------+---------+---------+---------+--------+------------------+
|41.765053| 83.34479|    -777|2008-04-03 16:00:00|              train| 550731.9|4719995.5|     0.01|       0|              0.01|
|41.765114|83.345116|    -777|2008-04-03 16:00:01|              train| 550704.4|4719994.0|27.540878|       1|27.540878295898438|
|41.765175| 83.34545|    -777|2008-04-03 16:00:02|              train| 550676.2|4719993.0|28.205233|       1|28.205232620239258|
| 41.76523|83.345795|    -777|2008-04-03 16:00:03|              train|550647.44|4719992.5|28.754347|       1| 28.75434684753418|
| 41.76528| 83.34614|    -777|2008-04-03 16:00:04|              train| 550618.7|4719991.5|28.7673

**Теперь переходим в генерации окон.** При помощи их, можно будет создать ранговые и аггрегационные функции наших исходных фичей в заданном промежутке времени. Экспериментальным путем было решено использовать размер окна **120 секунд**, так как есть корокие логи, которые даже не захватили б больший промежуток. А там хранилась полезная информаци про логирование пути в метро (лог попросту прерывался и не был продолжительным по времени).

И параллельно проведем **Label Encoding** наших меток таргета, т.е. заменим метки класса на цифровые значения и обьеденим несколько классов в один, так как по своей логике перемещения они ничем не отличаются и отличить их будет сложно.

In [14]:
def CreateWindow(df):
  
  agg_temp = df.groupBy(window(df.datetime, "120 seconds")).agg(mean('X_coord'), \
                                                                mean('Y_coord'), \
                                                                mean('Altitude'), \
                                                                #mean('Dist'), \
                                                                sum('Dist'), \
                                                                stddev('Dist'), \
                                                                mean('Speed'), \
                                                                #first('Speed'), \
                                                                (-first('Speed')+last('Speed')).alias('DeltaSpeed'), \
                                                                #mean('Angle'), \
                                                                #stddev('Angle'), \
                                                                #mean('DistByAngle'), \
                                                                first('Transportation Mode')).drop('window')
  TransportDict = {'walk':1,
              'bike':2,
              'bus':3,
              'car':4,
              'taxi':4,
              'subway,':5,
              'train':6,
              'airplane':7,
              'boat':8,
              'motorcycle':9}

  mapping = create_map([lit(x) for x in chain(*TransportDict.items())])
  agg_temp = agg_temp.withColumn('Transportation Mode', mapping[agg_temp['first(Transportation Mode)']]).drop('first(Transportation Mode)')
  
  return agg_temp

temp_final = CreateWindow(temp)
temp_final.show(1)

+-----------------+-----------------+-------------+------------------+-----------------+------------------+------------------+-------------------+
|     avg(X_coord)|     avg(Y_coord)|avg(Altitude)|         sum(Dist)|stddev_samp(Dist)|        avg(Speed)|        DeltaSpeed|Transportation Mode|
+-----------------+-----------------+-------------+------------------+-----------------+------------------+------------------+-------------------+
|549080.6384698276|4719942.254310345|       -777.0|3373.7341477964073|8.125190130579002|28.090952043642947|30.379400482177733|                  6|
+-----------------+-----------------+-------------+------------------+-----------------+------------------+------------------+-------------------+
only showing top 1 row



## Генерализация функций

Переносим наши полученные фичи на всех юзеров. Для этого проходим по всем юзерам и логам и проводим предобработку каждого отдельно. **Альтернативный подход:** локально соединить все логи с указанием юзера, но могли возникнуть вопросы с накладыванием данных одного юзера на другого в пределах одного окна + для допольнительного задания было б сложно добавлять в нужное место новую потоковую информацию про юзера.

In [15]:
import azure.storage
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
import os

connect_str = "DefaultEndpointsProtocol=https;AccountName=mlstorageall;AccountKey=djvx9juh6EJZWqx7T3U2D61BwhPWyEx4BcVyzbt/AQhaNR8hGmVBtzaJnbbG2w4c67p4dZkZIAWK6kq1PbV8Zg==;EndpointSuffix=core.windows.net"
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
container_client=blob_service_client.get_container_client(blob_container)

user = None
trajs = []
usersDict = {}
blob_list = container_client.list_blobs(name_starts_with="AllData_preprocess/")

for blob in blob_list:
  path_list = blob.name.split(os.sep)
  if path_list[1]!=user:
    user = path_list[1]
    trajs = []
  if len(path_list)>3:
    if (path_list[2]!='labels_csv.csv') & (path_list[1]==user):
      trajs.append(path_list[3])
      usersDict[user] = trajs


Получаем итоговый датафрейм с аггрегированными фичами для всех юзеров и метками таргета. Ориентировочное время работы 4 часа.

In [None]:
aggschema = StructType([ \
      StructField("avg(X_coord)", FloatType(), True), \
      StructField("avg(Y_coord)", FloatType(), True), \
      StructField("avg(Altitude)", FloatType(), True), \
      #StructField("avg(Dist)", FloatType(), True), \
      StructField("sum(Dist)", FloatType(), True), \
      StructField("stddev_samp(Dist)", FloatType(), True), \
      StructField("avg(Speed)", FloatType(), True), \
      #StructField("first(Speed)", FloatType(), True), \
      StructField("DeltaSpeed", FloatType(), True), \
      #StructField("avg(Angle)", FloatType(), True), \
      #StructField("stddev_samp(Angle)", FloatType(), True), \
      #StructField("avg(DistByAngle)", FloatType(), True), \
      StructField("Transportation Mode", IntegerType(), True)])

df_full = spark.createDataFrame(data=[], schema=aggschema)
for user in usersDict:
  print(user)
  for log in usersDict[user]:
    #print(log)
    df = readcsv("/" + user + "/Traj_Labelled/" + log, geoSchema)
    df1 = FeatureEng(df)
    df2 = CreateWindow(df1)
    df_full = df_full.unionByName(df2)
  #print(user)

## Моделинг

Я должен выполнить некоторые преобразования, чтобы объединить все столбцы функций в один столбец при помощи **VectorAssembler** для дальнейшего моделирования. Заменяем столбцы функций одним столбом с вектором функций.

In [22]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

df_full_without_nan = df_full.fillna(0)
vector_assembler = VectorAssembler(\
inputCols=["avg(X_coord)", "avg(Y_coord)", "avg(Altitude)", "sum(Dist)", "stddev_samp(Dist)", "avg(Speed)", "DeltaSpeed"],\
outputCol="features")
df_temp = vector_assembler.transform(df_full_without_nan)
df = df_temp.drop("avg(X_coord)", "avg(Y_coord)", "avg(Altitude)", "sum(Dist)", "stddev_samp(Dist)", "avg(Speed)", "DeltaSpeed")
#df.show(5)

После векторизации нужно разделить наши данные на обучающие и тестовые наборы (30% оставлено для тестирования):

In [24]:
(train, test) = df.randomSplit([0.70,0.30])

В качестве модели используем **RandomForest**. Ансамбли деревьев решений являются одними из самых популярных алгоритмов для задач классификации. Из-за комбинации множества деревьев решений классификатор случайного леса имеет меньший риск переобучения. API на основе DataFrame для машинного обучения поддерживает случайные леса как для двоичной, так и для мультиклассовой классификации.

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf = RandomForestClassifier(labelCol="Transportation Mode", featuresCol="features", numTrees=10)
dt = DecisionTreeClassifier(labelCol="Transportation Mode", featuresCol="features")
model = rf.fit(train)

Теперь можно делать прогнозы и смотреть метрики:

In [None]:
predictions = model.transform(test)

In [None]:
predictions.select("prediction", "Transportation Mode").show(5)

+----------+-------------------+
|prediction|Transportation Mode|
+----------+-------------------+
|       6.0|                  4|
|       1.0|                  1|
|       1.0|                  1|
|       1.0|                  1|
|       1.0|                  1|
+----------+-------------------+
only showing top 5 rows



Чтобы оценить точность прогноза, необходимо вычислить точность на тесте:

In [29]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator =\
MulticlassClassificationEvaluator(labelCol="Transportation Mode",\
predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = ", accuracy)

Test set accuracy =  0.7026258205689278


Точность достаточно неплохая, учитывая количество классов.

In [30]:
!pip install nbconvert



In [32]:
%%shell
jupyter nbconvert --to html /content/Spark_GeoLife_HW_Samoshyn.ipynb

[NbConvertApp] Converting notebook /content/Spark_GeoLife_HW_Samoshyn.ipynb to html
[NbConvertApp] Writing 730266 bytes to /content/Spark_GeoLife_HW_Samoshyn.html


