## Migrating from Spark to BigQuery via Dataproc -- Part 3

* [Part 1](01_spark.ipynb): The original Spark code, now running on Dataproc (lift-and-shift).
* [Part 2](02_gcs.ipynb): Replace HDFS by Google Cloud Storage. This enables job-specific-clusters. (cloud-native)
* [Part 3](03_automate.ipynb): Automate everything, so that we can run in a job-specific cluster. (cloud-optimized)
* [Part 4](04_bigquery.ipynb): Load CSV into BigQuery, use BigQuery. (modernize)
* [Part 5](05_functions.ipynb): Using Cloud Functions, launch analysis every time there is a new file in the bucket. (serverless)


### Catch up: data to GCS

In [2]:
# Catch up cell. Run if you did not do previous notebooks of this sequence
!wget http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz
BUCKET='julio_demo'  # CHANGE
!pip install google-compute-engine
!gsutil cp kdd* gs://$BUCKET/

--2020-03-02 13:59:20--  http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz
Resolving kdd.ics.uci.edu (kdd.ics.uci.edu)... 128.195.1.86
Connecting to kdd.ics.uci.edu (kdd.ics.uci.edu)|128.195.1.86|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2144903 (2.0M) [application/x-gzip]
Saving to: ‘kddcup.data_10_percent.gz.4’


2020-03-02 13:59:21 (4.66 MB/s) - ‘kddcup.data_10_percent.gz.4’ saved [2144903/2144903]

Copying file://kddcup.data_10_percent.gz [Content-Type=application/octet-stream]...
Copying file://kddcup.data_10_percent.gz.1 [Content-Type=application/octet-stream]...
Copying file://kddcup.data_10_percent.gz.2 [Content-Type=application/octet-stream]...
Copying file://kddcup.data_10_percent.gz.3 [Content-Type=application/octet-stream]...
\ [4 files][  8.2 MiB/  8.2 MiB]                                                
==> NOTE: You are performing a sequence of gsutil operations that may
run significantly faster if you instead use gsutil -

In [3]:
BUCKET='julio_demo'  # CHANGE
!gsutil ls gs://$BUCKET/kdd*

gs://julio_demo/kddcup.data_10_percent.gz
gs://julio_demo/kddcup.data_10_percent.gz.1
gs://julio_demo/kddcup.data_10_percent.gz.2
gs://julio_demo/kddcup.data_10_percent.gz.3
gs://julio_demo/kddcup.data_10_percent.gz.4


### Create a Python file

Put all the code in a Python file. We can comment out the display-only code such as ```take()``` and ```show()```
Make changeable settings like ```BUCKET``` come from sys.args

In [4]:
%%writefile spark_analysis.py

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--bucket", help="bucket for input and output")
args = parser.parse_args()

BUCKET = args.bucket

Writing spark_analysis.py


In [5]:
%%writefile -a spark_analysis.py

from pyspark.sql import SparkSession, SQLContext, Row

spark = SparkSession.builder.appName("kdd").getOrCreate()
sc = spark.sparkContext
data_file = "gs://{}/kddcup.data_10_percent.gz".format(BUCKET)
raw_rdd = sc.textFile(data_file).cache()
#raw_rdd.take(5)

Appending to spark_analysis.py


In [6]:
%%writefile -a spark_analysis.py

csv_rdd = raw_rdd.map(lambda row: row.split(","))
parsed_rdd = csv_rdd.map(lambda r: Row(
    duration=int(r[0]), 
    protocol_type=r[1],
    service=r[2],
    flag=r[3],
    src_bytes=int(r[4]),
    dst_bytes=int(r[5]),
    wrong_fragment=int(r[7]),
    urgent=int(r[8]),
    hot=int(r[9]),
    num_failed_logins=int(r[10]),
    num_compromised=int(r[12]),
    su_attempted=r[14],
    num_root=int(r[15]),
    num_file_creations=int(r[16]),
    label=r[-1]
    )
)
#parsed_rdd.take(5)

Appending to spark_analysis.py


In [7]:
%%writefile -a spark_analysis.py

sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(parsed_rdd)
connections_by_protocol = df.groupBy('protocol_type').count().orderBy('count', ascending=False)
connections_by_protocol.show()

Appending to spark_analysis.py


In [8]:
%%writefile -a spark_analysis.py

df.registerTempTable("connections")
attack_stats = sqlContext.sql("""
                           SELECT 
                             protocol_type, 
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as total_freq,
                             ROUND(AVG(src_bytes), 2) as mean_src_bytes,
                             ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
                             ROUND(AVG(duration), 2) as mean_duration,
                             SUM(num_failed_logins) as total_failed_logins,
                             SUM(num_compromised) as total_compromised,
                             SUM(num_file_creations) as total_file_creations,
                             SUM(su_attempted) as total_root_attempts,
                             SUM(num_root) as total_root_acceses
                           FROM connections
                           GROUP BY protocol_type, state
                           ORDER BY 3 DESC
                           """)
attack_stats.show()

Appending to spark_analysis.py


In [9]:
%%writefile -a spark_analysis.py

ax = attack_stats.toPandas().plot.bar(x='protocol_type', subplots=True, figsize=(10,25))

Appending to spark_analysis.py


### Write out report

Make sure to copy the output to GCS so that we can safely delete the cluster. This has to be pure Python, so replace shell commands by equivalent Python code.

In [10]:
%%writefile -a spark_analysis.py

ax[0].get_figure().savefig('report.png');
#!gsutil rm -rf gs://$BUCKET/sparktobq/
#!gsutil cp report.png gs://$BUCKET/sparktobq/

Appending to spark_analysis.py


In [11]:
%%writefile -a spark_analysis.py

import google.cloud.storage as gcs
bucket = gcs.Client().get_bucket(BUCKET)
for blob in bucket.list_blobs(prefix='sparktobq/'):
    blob.delete()
bucket.blob('sparktobq/report.png').upload_from_filename('report.png')

Appending to spark_analysis.py


In [12]:
%%writefile -a spark_analysis.py

connections_by_protocol.write.format("csv").mode("overwrite").save(
    "gs://{}/sparktobq/connections_by_protocol".format(BUCKET))

Appending to spark_analysis.py


### Test automation

Run it standalone

In [14]:
BUCKET='julio_demo'  # CHANGE
print('Writing to {}'.format(BUCKET))
!python spark_analysis.py --bucket=$BUCKET

Writing to julio_demo
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/03/02 14:02:50 WARN org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
20/03/02 14:02:50 WARN org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
20/03/02 14:02:50 WARN org.apache.spark.scheduler.FairSchedulableBuilder: Fair Scheduler configuration file not found so jobs will be scheduled in FIFO order. To use fair scheduling, configure pools in fairscheduler.xml or set spark.scheduler.allocation.file to a file that contains the configuration.
[Stage 0:>                                                          (0 + 0) / 1]20/03/02 14:03:14 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
20/03/02 14:03:29

In [15]:
!gsutil ls gs://$BUCKET/sparktobq/**

gs://julio_demo/sparktobq/
gs://julio_demo/sparktobq/connections_by_protocol/
gs://julio_demo/sparktobq/connections_by_protocol/_SUCCESS
gs://julio_demo/sparktobq/connections_by_protocol/part-00000-36337da4-4544-4c59-b6ad-1e8186239241-c000.csv
gs://julio_demo/sparktobq/connections_by_protocol/part-00001-36337da4-4544-4c59-b6ad-1e8186239241-c000.csv
gs://julio_demo/sparktobq/connections_by_protocol/part-00002-36337da4-4544-4c59-b6ad-1e8186239241-c000.csv
gs://julio_demo/sparktobq/report.png
