In [None]:
import os
import pyspark
from pyspark import SparkConf, Spark

In [1]:
Context
from pyspark.sql import SparkSession

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell'
conf = SparkConf().setAppName("Medhere").setMaster("local[*]")
#extra memory
SparkContext.setSystemProperty('spark.executor.memory', '15g')
SparkContext.setSystemProperty('spark.driver.memory', '15g')

sc = pyspark.SparkContext(conf=conf).getOrCreate()

In [2]:
sc

## Variables

In [3]:
cgms_data = 'cgms.csv'
meal_time_data = 'meal_time.csv'

In [4]:
from pyspark.sql import SQLContext
# assuming the spark environemnt is set and sc is spark.sparkContext
sqlContext = SQLContext(sc)

In [5]:
header = sc.textFile(meal_time_data).first()
new_column = ['user', 'time', 'label']

meal_time_df = sc.textFile(meal_time_data).filter(lambda x: x != header)\
                 .map(lambda x: x.split(','))\
                 .map(lambda x: (x[0], x[2], 1)).toDF()

for c, n in zip(meal_time_df.columns, new_column):
    meal_time_df = meal_time_df.withColumnRenamed(c, n)

In [6]:
meal_time_df.show(3)

+-------------+--------+-----+
|         user|    time|label|
+-------------+--------+-----+
|T-04_IPEN_Run|03:31:00|    1|
|T-04_IPEN_Run|05:20:00|    1|
|T-04_IPEN_Run|10:24:00|    1|
+-------------+--------+-----+
only showing top 3 rows



In [7]:
header = sc.textFile(cgms_data).first()

cgms_df = sc.textFile(cgms_data).filter(lambda x: x != header)\
    .map(lambda x: x.split(','))\
    .map(lambda x: (x[0], x[1], float(x[2]), float(x[3]),
                    float(x[4]), float(x[5]), float(x[6]),
                    float(x[7]), float(x[8]), float(x[9]), 
                    float(x[10]), float(x[11]), float(x[12]),
                    float(x[13]), float(x[14]), float(x[15]))).toDF()

In [8]:
cgms_df.show(3)

+-------------+--------+-----------------+--------------------+-------------------+--------------------+-------------------+-------------------+--------------------+------------------+------------------+-----------------+------------------+----+------------------+-----------------+
|           _1|      _2|               _3|                  _4|                 _5|                  _6|                 _7|                 _8|                  _9|               _10|               _11|              _12|               _13| _14|               _15|              _16|
+-------------+--------+-----------------+--------------------+-------------------+--------------------+-------------------+-------------------+--------------------+------------------+------------------+-----------------+------------------+----+------------------+-----------------+
|T-04_IPEN_Run|00:00:00|83.16726152243767|5.260906662625613E-4|0.21077864128950047|3.791610828123643...| 0.2110929868794306|0.23817284490383425| -0.374

In [16]:
cond = [cgms_df._1 == meal_time_df.user, cgms_df._2 == meal_time_df.time]
cgms_meal_df = cgms_df.join(meal_time_df, cond, how='left').orderBy("user", "time")

In [17]:
cgms_meal_df.show(3)

+---------------+--------+-----------------+--------------------+-------------------+--------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+----+----+-----+
|             _1|      _2|               _3|                  _4|                 _5|                  _6|                 _7|                 _8|                 _9|               _10|               _11|               _12|               _13|               _14|              _15|               _16|user|time|label|
+---------------+--------+-----------------+--------------------+-------------------+--------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+----+----+-----+
|T-01_3h_PEN_Run|04:15:00|80.50837947452847|0.003488583

In [11]:
cgms_meal_df.filter("label = 1").show(3)

+-------------+--------+-----------------+--------------------+-------------------+--------------------+-------------------+-------------------+--------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+-------------+--------+-----+
|           _1|      _2|               _3|                  _4|                 _5|                  _6|                 _7|                 _8|                  _9|               _10|               _11|               _12|               _13|              _14|              _15|               _16|         user|    time|label|
+-------------+--------+-----------------+--------------------+-------------------+--------------------+-------------------+-------------------+--------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+-------------+--------+-----+
|T-02_IPEN_Run|02:41:0

In [34]:
cgms_meal_df = cgms_meal_df.select("_1", "_2", "_3", "_4", "_5", "_6", "_7", "_8",
                                   "_9", "_10", "_11", "_12", "_13", "_14", "_15",
                                   "_16", "label")

In [35]:
cgms_meal_df.show(5)

+---------------+--------+-----------------+--------------------+-------------------+--------------------+-------------------+-------------------+--------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----+
|             _1|      _2|               _3|                  _4|                 _5|                  _6|                 _7|                 _8|                  _9|               _10|               _11|               _12|              _13|              _14|               _15|               _16|label|
+---------------+--------+-----------------+--------------------+-------------------+--------------------+-------------------+-------------------+--------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----+
|T-01_3h_PEN_Run|04:08:00|80.54046250241346|0.003474725485632051|0.20687606709133022|

In [37]:
#cgms_meal_df.write.save("cgms_meal_df.parquet", format="parquet")

In [38]:
cgms_meal_rdd = cgms_meal_df.rdd

In [40]:
cgms_meal_rdd = cgms_meal_rdd.map(lambda x : ((x[0], x[1]), (x[2:])))

In [42]:
cgms_meal_rdd.sortByKey().collect()

[(('T-01_3h_PEN_Run', '00:00:00'),
  (79.1986141022791,
   0.0009189730303311034,
   0.2093864879120009,
   0.0002657456528827042,
   0.21024794586888992,
   0.36552564404145577,
   -0.3710235350848133,
   0.9851818266943612,
   1.8664751140790992,
   76.83687197864636,
   12.081779270491767,
   36.6,
   34.100306800491744,
   107.34629907804702,
   None)),
 (('T-01_3h_PEN_Run', '00:00:00'),
  (82.53274812153698,
   0.0037557056439338357,
   0.2065715837996449,
   0.00028723269574478335,
   0.21104494384352904,
   0.4254669387971085,
   -0.5026265249658205,
   0.8464872378672037,
   2.4514175691999003,
   71.98796030056276,
   6.003758020636827,
   37.10000007005861,
   33.20233551125462,
   122.08801514869707,
   None)),
 (('T-01_3h_PEN_Run', '00:01:00'),
  (79.20005721706292,
   0.0009334746541152724,
   0.20937298245355615,
   0.00026572277715397,
   0.21024641734300578,
   0.3657906282075571,
   -0.36562851229898313,
   1.000443389678651,
   1.8455064017678506,
   71.72083377131597