# 1. Welcome. Introduction to Spark and the dataset

Let's import some libraries

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import gzip
import re
from scipy import signal
import numpy as np

import pyspark.sql.functions as F

from utils import start_spark_session, get_s3, list_s3

Our dataset lives in S3

In [None]:
lf = list_s3()
print(len(lf), 'files')
lf[:5]

Let's check one of them

In [None]:
get_s3('configs/0/batch_00.log.gz', 'tmp.log.gz')

In [None]:
with gzip.open('tmp.log.gz', 'rt') as f:
    for line in f.readlines()[:5]:
        print(line.strip())

The columns are:

```
time, cycle, conf, run, x, y, z
```

We will see what they mean in a bit. For now let's write some code that automates parsing those files.

In [None]:
def process(src):
    dst = "tmp.log.gz"
    get_s3(src, dst)
    with gzip.open(dst, 'rt') as f:
        count = 0
        for line in f.readlines():
            time, cycle, conf, run, x, y, z = re.split('[:,]', line.strip())
            cycle, conf, run = map(int, (cycle, conf, run))
            time, x, y, z = map(float, (time, x, y, z))

            count += 1
        return count

Note that the code above is about 12 lines, and there are several things missing:

    * No cleanup
    * No retries in case of failure
    * Not able to process many files in parallel
    * Explicitly tailored to gzip files

In [None]:
%%time

count = process('configs/0/batch_00.log.gz')
print(count)

In [None]:
!ls -lah tmp.log.gz

We can see some seconds spent. We would project about a minute for 10 such files, if the other ones have the same size.

Let's try Spark. We start the Spark session with a utility function...

In [None]:
spark = start_spark_session()

You can link on the 'Spark UI' link below. Spent some time to explore the screen. It will become more interesting soon.

In [1]:
spark

Let's do exactly the same thing as before with Spark.

In [None]:
%time
base_df = spark.read.text('s3a://enginestream/configs/0/batch_00.log.gz')
print(base_df.count())

Let's do it for all (10) files in that directory (by using the '*' wildcard)

In [None]:
%time
base_df = spark.read.text('s3a://enginestream/configs/0/batch_*.log.gz')
print(base_df.count())

It doesn't take 70 seconds. Why? Because

1. it uses concurrency and
2. it runs Scala

We can do full parsing with code similar but slightly more expressive to the pure Python code.

In [None]:
logs_df = (base_df
           .select(F.split('value', '[:,]').alias('cols'))
           .select(F.expr("cols[0]").cast("float").alias('time'),
                   F.expr("cols[1]").cast("long").alias('cycle'),
                   F.expr("cols[2]").cast("long").alias('conf'),
                   F.expr("cols[3]").cast("long").alias('run'),
                   F.expr("cols[4]").cast("float").alias('x'),
                   F.expr("cols[5]").cast("float").alias('y'),
                   F.expr("cols[6]").cast("float").alias('z'))
           .drop("cols"))

logs_df.show(10, truncate=False)

We can easily get Pandas dataframes that look better and allow easy analysis

In [None]:
logs_df.limit(10).toPandas()

Our `logs_df` data has a schema implicitly defined

In [None]:
logs_df.printSchema()

We can write SQL queries using the API (i.e. Python). It will run against the entire dataset right now (all 10 files).

```
Compressed size: 273 Mb
Uncompressed size: 832 Mb
Rows: 17,222,161
```

In [None]:
logs_df.select(F.min('run'),
               F.max('run'),
               F.min('cycle'),
               F.max('cycle')).show()

Alternatively we can use the usual SQL.

In [None]:
logs_df.createOrReplaceTempView("logs")

pdf = (spark.sql("SELECT MIN(run), MAX(run), MIN(cycle), MAX(cycle)"
                 "FROM logs")
       .toPandas())
pdf

Let's investigate the 'run' argument. How many rows are there for every run?

In [None]:
pdf = (spark.sql("SELECT run, count(1) AS cnt "
                 "FROM logs "
                 "GROUP BY run "
                 "ORDER BY cnt DESC")
       .toPandas())
pdf

Let's extract all data for the smallest run to investigate further.

In [None]:
pdf = spark.sql("SELECT * FROM logs WHERE run = 2001").toPandas()
pdf

Since it's a Pandas dataframe, it's trivial to plot it or do a histogram of e.g. the `x` field.

In [None]:
_ = pdf['x'].plot()

In [None]:
_ = pdf['x'].hist(bins=50)

Let's plot more the fields

In [None]:
fig, ax = plt.subplots(nrows=2, ncols=2)

ax[0][0].plot(pdf['cycle'])
ax[0][1].plot(pdf['x'])
ax[1][0].plot(pdf['y'])
ax[1][1].plot(pdf['z'])

Let's zoom in in `x` and `y`, for the first few cycles 

In [None]:
end = 1800

for i in ('x', 'y'):
    plt.plot(pdf[i][:end])

They seem to be out-of-phase with a consistent way. Let's plot them in two-axis.

In [None]:
end = 2500
offset = 2

plt.plot(pdf['x'][:end], pdf['y'][:end])

Interesting. They are connected, and have a consistent relationship, since they're ellipses. But their centers seem to move in and out. Maybe we can see their change in time by using the cycle variable.

In [None]:
start = 35000
end = 40000
offset = 2

plt.plot([x + offset * c
          for x, c in zip(pdf['x'][start:end], pdf['cycle'][start:end])],
         pdf['y'][start:end])

Ok - we are convinced that their ampitude is related and changes in time in a sinusoidal way. Is their frequency in time stable?

In [None]:
end = 20000

fs = 100
f, t, Zxx = signal.stft(pdf['x'][:end], fs, nperseg=1000)
plt.pcolormesh(t, f, np.abs(Zxx), vmin=0, vmax=3)
plt.ylim((0, 2))

No, frequency goes up and down as well! What's the histogram of the duration of a cycle.

In [None]:
pdf = spark.sql("SELECT MAX(time) - MIN(time) AS duration "
                "FROM logs "
                "WHERE run = 2001 "
                "GROUP BY cycle "
                "ORDER BY duration DESC"
               ).toPandas()
pdf

In [None]:
pdf['duration'].hist(bins=50)

The low ones can be noize. It seems that the duration is between 0.08 and 0.12 units. Wondering if that's the case for every run. Let's run it on the entire dataset.

In [None]:
pdf = spark.sql("SELECT run, MAX(time) - MIN(time) AS duration "
                "FROM logs "
                "GROUP BY run, cycle "
                "ORDER BY duration DESC"
               ).toPandas()

pdf['duration'].hist(bins=50)

Seems like we have an outlier (measurement error) around 6000. Let's remove it:

In [None]:
pdf[pdf['duration'] < 6000]['duration'].hist(bins=50)

In [None]:
pdf[pdf['duration'] < 6000]['duration'].describe()

In [None]:
(pdf[pdf['duration'] < 6000]['duration'].quantile(q=0.95),
 pdf[pdf['duration'] < 6000]['duration'].quantile(q=0.99))

Ok, so indeed the range between 0.08 and 0.12 units is the most popular. The 95th quantile is 0.17 and the 99th is 0.23.

In [None]:
spark.stop()