In [1]:
import pyspark
from pyspark.sql import SparkSession
sc = pyspark.SparkContext('local[*]')

In [2]:
sc.version

'2.4.0'

In [3]:
spark = SparkSession.builder.getOrCreate()

In [7]:
df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("delimiter", "\t")
    .load("EMG_data_for_gestures-master/*/*.txt")
)

In [8]:
df.count()

4237908

In [9]:
df.show(5)

+----+--------+--------+--------+--------+--------+--------+--------+--------+-----+
|time|channel1|channel2|channel3|channel4|channel5|channel6|channel7|channel8|class|
+----+--------+--------+--------+--------+--------+--------+--------+--------+-----+
|   0|  -1e-05|  -2e-05|  -1e-05|  -5e-05|  -6e-05|  -4e-05|  -1e-05|  -2e-05|    0|
|   3|  -2e-05|       0| 0.00011|   6e-05|-0.00013|  -4e-05|       0|   1e-05|    0|
|   4|  -2e-05|       0| 0.00011|   6e-05|-0.00013|  -4e-05|       0|   1e-05|    0|
|   5|  -2e-05|       0| 0.00011|   6e-05|-0.00013|  -4e-05|       0|   1e-05|    0|
|   6|  -2e-05|       0| 0.00011|   6e-05|-0.00013|  -4e-05|       0|   1e-05|    0|
+----+--------+--------+--------+--------+--------+--------+--------+--------+-----+
only showing top 5 rows



In [20]:
from pyspark.sql import functions as F
from pyspark.sql import types

In [19]:
df.dtypes

[('time', 'string'),
 ('channel1', 'string'),
 ('channel2', 'string'),
 ('channel3', 'string'),
 ('channel4', 'string'),
 ('channel5', 'string'),
 ('channel6', 'string'),
 ('channel7', 'string'),
 ('channel8', 'string'),
 ('class', 'string')]

In [32]:
df = df.withColumn("time", df["time"].cast(types.IntegerType()))

In [24]:
for i in range(1, 9):
    ch = "channel"+str(i)
    df = df.withColumn(ch, df[ch].cast(types.FloatType()))

In [33]:
df.dtypes

[('time', 'int'),
 ('channel1', 'float'),
 ('channel2', 'float'),
 ('channel3', 'float'),
 ('channel4', 'float'),
 ('channel5', 'float'),
 ('channel6', 'float'),
 ('channel7', 'float'),
 ('channel8', 'float'),
 ('class', 'string')]

In [30]:
df.groupBy("class").agg(
    F.count("time"),
    F.mean("time"),
    F.max("time"),
    F.max("channel1"),
).collect()

[Row(class='7', count(time)=13696, avg(time)=54512.66800525701, max(time)='78181', max(channel1)=0.0012700000079348683),
 Row(class='3', count(time)=249494, avg(time)=27437.843715680538, max(time)='9999', max(channel1)=0.0012700000079348683),
 Row(class='0', count(time)=2725157, avg(time)=31530.017472020878, max(time)='9999', max(channel1)=0.0012700000079348683),
 Row(class=None, count(time)=1, avg(time)=97144.0, max(time)='97144', max(channel1)=-1.9999999494757503e-05),
 Row(class='5', count(time)=251733, avg(time)=37979.939125978715, max(time)='69419', max(channel1)=0.0012700000079348683),
 Row(class='6', count(time)=253009, avg(time)=43169.45518538866, max(time)='76979', max(channel1)=0.0012700000079348683),
 Row(class='1', count(time)=250055, avg(time)=17544.549507108437, max(time)='999', max(channel1)=0.00015999999595806003),
 Row(class='4', count(time)=251570, avg(time)=32673.696760345032, max(time)='62163', max(channel1)=0.0007399999885819852),
 Row(class='2', count(time)=243193

In [38]:
df.agg(F.min("time")).collect()

[Row(min(time)=0)]

In [39]:
df.summary().collect()

[Row(summary='count', time='4237908', channel1='4237908', channel2='4237908', channel3='4237908', channel4='4237908', channel5='4237908', channel6='4237908', channel7='4237908', channel8='4237908', class='4237907'),
 Row(summary='mean', time='31136.89160170537', channel1='-7.911481221985641E-6', channel2='-9.416077347189007E-6', channel3='-9.548734826094754E-6', channel4='-9.637837728907124E-6', channel5='-1.5997241933525083E-5', channel6='-1.0855280385357427E-5', channel7='-9.364636874988767E-6', channel8='-9.696859735991815E-6', class='1.2656705302877105'),
 Row(summary='stddev', time='18680.79174887248', channel1='1.6311104693823503E-4', channel2='1.1922284688637004E-4', channel3='1.2418463026896086E-4', channel4='2.2577281974945991E-4', channel5='2.7241880261724184E-4', channel6='2.1514050941003502E-4', channel7='1.527310818136283E-4', channel8='1.720933468211266E-4', class='1.9896927735728438'),
 Row(summary='min', time='0', channel1='-0.00128', channel2='-0.00128', channel3='-0.0

In [40]:
(
    df.write.format('parquet')
    .bucketBy(100, 'class')
    .sortBy('time')
    .mode("overwrite")
    .saveAsTable('emg_data')
)