### Read in and Combine Daily Data (stored as parquet files)

In [1]:
import os
# from utils.utils import *

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from google.cloud import storage
from pyspark.sql.functions import max, col, count, \
    lit, countDistinct
from pyspark.sql.types import FloatType

import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
spark = SparkSession.builder.appName('ParquetExample').getOrCreate()
spark

### Paths and Data read

In [3]:
daily_metadata = spark.read.csv("gs://msca-bdp-student-gcs/parkinsons_data/daily_metadata.csv", header=True)

                                                                                

In [4]:
daily_path = "parkinsons_data/unlabeled"
top_bucket_name = "msca-bdp-student-gcs"
# full_path = "msca-bdp-student-gcs/parkinsons_data/unlabeled"

In [5]:
# https://cloud.google.com/storage/docs/samples/storage-list-files#storage_list_files-python
# add file name as column - https://sparkbyexamples.com/pyspark/pyspark-lit-add-literal-constant/
# https://www.geeksforgeeks.org/how-to-union-multiple-dataframe-in-pyspark/

client = storage.Client()
blobs = client.list_blobs(top_bucket_name, prefix = daily_path)

daily_data = None
for blob in blobs:
    file_name = blob.name.split("/")[-1].split(".")[0]
    print("file name:", file_name)
    df = spark.read.parquet(f"gs://{top_bucket_name}/{blob.name}")
    df = df.withColumn("Id", lit(file_name))
    if daily_data is None:
        daily_data = df
    else:
        daily_data = daily_data.union(df)

   

file name: 00c4c9313d
file name: 07a96f89ec
file name: 0d1bc672a8
file name: 0e333c9833
file name: 164adaed7b
file name: 17e0c0dc86
file name: 1c3719ea59
file name: 1cf80df2d6
file name: 24016102f2
file name: 276630050d
file name: 28e6c306ba
file name: 2caa348298
file name: 32bdbc35a0
file name: 3ae6b0f79f
file name: 3bd159ded0
file name: 3f51a63612
file name: 3fc03f01ed
file name: 3fe2624b51
file name: 40bf6c162f
file name: 418a1ca2c1
file name: 43ac46d679
file name: 48081794eb
file name: 48b636e0f5
file name: 4b84027351
file name: 4e44a97a85
file name: 52fd07ea27
file name: 5535c94fc9
file name: 57741bad42
file name: 5bf570bb7b
file name: 5e13d48878
file name: 6e0303484e
file name: 6ed2f109c3
file name: 74f1e1e0ba
file name: 7ab610bb34
file name: 831c13620e
file name: 88f67f91db
file name: 8959244e1c
file name: 8ca674a988
file name: 924e997065
file name: 93abd37fee
file name: 96f57b4a40
file name: 9da3e3dc66
file name: 9fb7805d99
file name: a213c90b02
file name: b15168b788
file name:

In [6]:
daily_data.printSchema()

root
 |-- Time: long (nullable = true)
 |-- AccV: double (nullable = true)
 |-- AccML: double (nullable = true)
 |-- AccAP: double (nullable = true)
 |-- Id: string (nullable = false)



In [7]:
daily_data.show(5)



+----+------------------+------------------+-----------------+----------+
|Time|              AccV|             AccML|            AccAP|        Id|
+----+------------------+------------------+-----------------+----------+
|   0|          0.328125|         -0.109375|         0.671875|00c4c9313d|
|   1| 0.453107990150706|  -0.1247208674257|0.811273150079803|00c4c9313d|
|   2| 0.423042391192052|-0.264046005447829|0.921238212647563|00c4c9313d|
|   3| 0.150014724987375|-0.310240837149531|0.937482659979879|00c4c9313d|
|   4|-0.202003096762013|-0.545907654638822|0.890842282170504|00c4c9313d|
+----+------------------+------------------+-----------------+----------+
only showing top 5 rows



                                                                                

In [None]:
#display number of records by partition

def displaypartitions(df):
    #number of records by partition
    num = df.rdd.getNumPartitions()
    print("Partitions:", num)
    df.withColumn("partitionId", F.spark_partition_id())\
        .groupBy("partitionId")\
        .count()\
        .orderBy(F.asc("count"))\
        .show(num)
    
displaypartitions(daily_data)

Partitions: 550




+-----------+--------+
|partitionId|   count|
+-----------+--------+
|        217|43340237|
|        476|60022027|
|        303|60432918|
|        392|60479060|
|         29|60479239|
|        226|60479291|
|         53|60479351|
|        322|60479467|
|        495|60479468|
|        254|60479484|
|         91|60479492|
|        343|60479526|
|        286|60479547|
|        296|60479547|
|        136|60479547|
|         60|60479559|
|        381|60479596|
|        246|60479601|
|         10|60479608|
|        311|60479647|
|        544|60479670|
|        333|60479704|
|        220|60479705|
|        469|60479716|
|        199|60479758|
|        353|60479768|
|        159|60479815|
|        210|60479895|
|        509|60479898|
|        410|60645100|
|        399|60839503|
|        223|60839621|
|         71|60839697|
|        520|60839792|
|        178|61559279|
|        110|61559323|
|        263|61919466|
|        534|62158042|
|         49|62723265|
|        114|65519465|
|        50

                                                                                

In [None]:
# Check number of ids = 65
num_ids = daily_data.select("Id").distinct().count()
print(f"Number of unique ids: {num_ids}")




Number of unique ids: 65


                                                                                

In [None]:
max_time = daily_data.select(max("Time")).collect()[0][0]
max_time


                                                                                

86759734

In [None]:
max_AccV = daily_data.select(max("AccV")).collect()[0][0]
max_AccV

                                                                                

7.999755859375

In [None]:
daily_data.printSchema()

root
 |-- Time: long (nullable = true)
 |-- AccV: double (nullable = true)
 |-- AccML: double (nullable = true)
 |-- AccAP: double (nullable = true)
 |-- Id: string (nullable = false)



In [14]:
 daily_data = daily_data.withColumn("AccV", col("AccV").cast(FloatType())) \
    .withColumn("AccML", col("AccML").cast(FloatType())) \
    .withColumn("AccAP", col("AccAP").cast(FloatType()))

NameError: name 'FloatType' is not defined

In [None]:
daily_data.printSchema()

## Join with Subject ID

In [None]:
daily_metadata.show(1)

In [None]:
# daily_data2 = daily_data.join(daily_metadata, daily_data.Id ==  daily_metadata.Id,"inner")
daily_data2 = daily_data.join(daily_metadata, on="Id", how="left")


In [None]:
spark.conf.set("spark.sql.adaptive.enabled", "false")
daily_data2.show(2)

In [None]:
# Check merge
# Count of unique non-null and non-empty subject IDs
count_unique_subjects = daily_data2.filter(col("Subject").select("Subject").distinct().count()
#                                            .isNotNull() & (col("Subject") != ""))
                                            

print("Count of unique non-null and non-empty subject IDs: ", count_unique_subjects)


### Add Time in Seconds Variable

In [None]:
daily_data2 = daily_data2.withColumn("TimeSeconds", col("Time") / 100)
daily_data2.show(5)