## Migrating from Spark to BigQuery via Dataproc -- Part 1

* [Part 1](01_spark.ipynb): The original Spark code, now running on Dataproc (lift-and-shift).
* [Part 2](02_gcs.ipynb): Replace HDFS by Google Cloud Storage. This enables job-specific-clusters. (cloud-native)
* [Part 3](03_automate.ipynb): Automate everything, so that we can run in a job-specific cluster. (cloud-optimized)
* [Part 4](04_bigquery.ipynb): Load CSV into BigQuery, use BigQuery. (modernize)
* [Part 5](05_functions.ipynb): Using Cloud Functions, launch analysis every time there is a new file in the bucket. (serverless)


In [20]:
%%writefile spark_analysis.py

import matplotlib
matplotlib.use('agg')

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--bucket", help="bucket for input and output")
args = parser.parse_args()

BUCKET = args.bucket

Overwriting spark_analysis.py


The `%%writefile spark_analysis.py` Jupyter magic command creates a new output file to contain your standalone python script. You will add a variation of this to the remaining cells to append the contents of each cell to the standalone script file.

This code also imports the matplotlib module and explicitly sets the default plotting backend via `matplotlib.use('agg')` so that the plotting code runs outside of a Jupyter notebook.

### Reading in data

The data are CSV files. In Spark, these can be read using textFile and splitting rows on commas.

The only change here is create a variable to store a Cloud Storage bucket name and then to point the data_file to the bucket we used to store the source data on Cloud Storage. Storage only requires that you repoint your storage source reference from hdfs:// to gs://.

In [21]:
%%writefile -a spark_analysis.py

from pyspark.sql import SparkSession, SQLContext, Row

gcs_bucket='qwiklabs-gcp-03-e88cc63f6e1f'
spark = SparkSession.builder.appName("kdd").getOrCreate()
sc = spark.sparkContext
data_file = "gs://"+gcs_bucket+"//kddcup.data_10_percent.gz"
raw_rdd = sc.textFile(data_file).cache()
raw_rdd.take(5)

Appending to spark_analysis.py


In [22]:
%%writefile -a spark_analysis.py

csv_rdd = raw_rdd.map(lambda row: row.split(","))
parsed_rdd = csv_rdd.map(lambda r: Row(
    duration=int(r[0]), 
    protocol_type=r[1],
    service=r[2],
    flag=r[3],
    src_bytes=int(r[4]),
    dst_bytes=int(r[5]),
    wrong_fragment=int(r[7]),
    urgent=int(r[8]),
    hot=int(r[9]),
    num_failed_logins=int(r[10]),
    num_compromised=int(r[12]),
    su_attempted=r[14],
    num_root=int(r[15]),
    num_file_creations=int(r[16]),
    label=r[-1]
    )
)
parsed_rdd.take(5)

Appending to spark_analysis.py


### Spark analysis

One way to analyze data in Spark is to call methods on a dataframe.

In [23]:
%%writefile -a spark_analysis.py

sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(parsed_rdd)
connections_by_protocol = df.groupBy('protocol_type').count().orderBy('count', ascending=False)
connections_by_protocol.show()

Appending to spark_analysis.py


Another way is to use Spark SQL

In [24]:
%%writefile -a spark_analysis.py

df.registerTempTable("connections")
attack_stats = sqlContext.sql("""
                           SELECT 
                             protocol_type, 
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as total_freq,
                             ROUND(AVG(src_bytes), 2) as mean_src_bytes,
                             ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
                             ROUND(AVG(duration), 2) as mean_duration,
                             SUM(num_failed_logins) as total_failed_logins,
                             SUM(num_compromised) as total_compromised,
                             SUM(num_file_creations) as total_file_creations,
                             SUM(su_attempted) as total_root_attempts,
                             SUM(num_root) as total_root_acceses
                           FROM connections
                           GROUP BY protocol_type, state
                           ORDER BY 3 DESC
                           """)
attack_stats.show()

Appending to spark_analysis.py


In [25]:
%%writefile -a spark_analysis.py

ax = attack_stats.toPandas().plot.bar(x='protocol_type', subplots=True, figsize=(10,25))

Appending to spark_analysis.py


In [26]:
%%writefile -a spark_analysis.py

ax[0].get_figure().savefig('report.png');


Appending to spark_analysis.py


In [27]:
%%writefile -a spark_analysis.py

import google.cloud.storage as gcs
bucket = gcs.Client().get_bucket(BUCKET)
for blob in bucket.list_blobs(prefix='sparktodp/'):
    blob.delete()
bucket.blob('sparktodp/report.png').upload_from_filename('report.png')

Appending to spark_analysis.py


In [28]:
%%writefile -a spark_analysis.py

connections_by_protocol.write.format("csv").mode("overwrite").save(
    "gs://{}/sparktodp/connections_by_protocol".format(BUCKET))

Appending to spark_analysis.py


### Test Automation

In [29]:
BUCKET_list = !gcloud info --format='value(config.project)'
BUCKET=BUCKET_list[0]
print('Writing to {}'.format(BUCKET))
!/opt/conda/miniconda3/bin/python spark_analysis.py --bucket=$BUCKET


Writing to qwiklabs-gcp-03-e88cc63f6e1f
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/04 01:07:19 INFO SparkEnv: Registering MapOutputTracker
24/04/04 01:07:19 INFO SparkEnv: Registering BlockManagerMaster
24/04/04 01:07:19 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/04/04 01:07:19 INFO SparkEnv: Registering OutputCommitCoordinator
                                                                                

This lists the script output files that have been saved to your Cloud Storage bucket.

In [30]:
!gcloud storage ls gs://$BUCKET/sparktodp/**

gs://qwiklabs-gcp-03-e88cc63f6e1f/sparktodp/connections_by_protocol/
gs://qwiklabs-gcp-03-e88cc63f6e1f/sparktodp/connections_by_protocol/_SUCCESS
gs://qwiklabs-gcp-03-e88cc63f6e1f/sparktodp/connections_by_protocol/part-00000-bffea1c8-7017-44fd-ad2e-bf869b30eacd-c000.csv
gs://qwiklabs-gcp-03-e88cc63f6e1f/sparktodp/report.png


Save the Python file to a persistent storage

In [31]:
!gcloud storage cp spark_analysis.py gs://$BUCKET/sparktodp/spark_analysis.py


Copying file://spark_analysis.py to gs://qwiklabs-gcp-03-e88cc63f6e1f/sparktodp/spark_analysis.py
  Completed files 1/1 | 2.8kiB/2.8kiB                                          
