# Templates & Links for the Course M9
This notebook contains useful templates (e.g. shell commands, code snippets) as well as links for the course M9 (BI and Big Data Architectures).
<hr>

# Links to the Used Web Interfaces
- JupyterHub (for Python development with Jupyter Notebooks): see slides
 - user: see slides
 - password: provided in lecture
- Hue (the Hadoop web interface): see slides
 - user: see slides
 - password: provided in lecture


<hr>

# Lecture 2 - Big Data and Hadoop Fundamentals

## Set up the SSH connection to Cloudera
In order to connect to our hadoop on-premise system (cloudera) use an SSH-client and connect to 
- Host: see slides
- Port: 22
- User: see slides
- Password: provided in lecture

Windows: download Putty or MobaXterm (save the connection for later usage)<br>
Mac: open a terminal and execute <code>ssh ...</code>

## Connect to the local PostgreSQL database
<code>mysql -u xxx -p -D xxx</code>
<br>
Password: provided in lecture<br>
Within the client:<br>
<code>show databases;</code><br>
<code>show tables;</code>

## HDFS NameNode and DataNode
- http://xxx:50070
- http://xxx:50075

## Lab 2.1: HDFS Introduction

In [None]:
hadoop fs -ls /user/cloudera/
hadoop fs -ls /user/cloudera/2_mr
hadoop fs -cat /user/cloudera/2_mr/orders.csv
cat 123456 > test_matrnr.txt
hadoop fs -mkdir /user/cloudera/students/.../test
hadoop fs -put test_matrnr.txt /user/cloudera/students/.../test
hadoop fs -ls /user/cloudera/students/.../
hadoop fs -cat /user/cloudera/students/.../test/test_martnr.txt
hadoop fs -rm /user/cloudera/students/.../test/*
hadoop fs -rmdir /user/cloudera/students/.../test

## Lab 2.2: MapReduce Example

In [None]:
cd /home/cloudera/2_mr
hadoop jar wordcount.jar WordCount /user/cloudera/students/.../input/ /user/cloudera/students/.../output

## GCP Registration

xxx

<hr>

# Lecture 3 - Batch Data Ingestion and Data Management

## Lab 3.1: Manual file upload

<code>hadoop fs -put sample-orders.csv /user/cloudera/student/.../3_batch_ingest/landing</code>

Workflow:<br>
- Source: <code>/user/cloudera/3_batch_ingest/landing/sample-orders.csv</code>
- Target: <code>/user/cloudera/3_batch_ingest/order_history/sample-orders_${wf:id()}.csv</code>

Link to Oozie:
- Workflows: http://xxx:8888/oozie/editor/workflow/list/
- Coordinators: http://xxx:8888/oozie/list_oozie_coordinators/

## Lab 3.2: Structured Data Upload with sqoop

<code>sqoop import --table categories --connect jdbc:mysql://localhost:3306/retail_db --username=xxx --password=xxx --target-dir /user/cloudera/students/.../3_batch_ingest/sqoop_categories --delete-target-dir</code>

## Lab 3.3: sqoop Extensions

<code>sqoop import --table order_items --connect jdbc:mysql://localhost:3306/retail_db --username=xxx --password=xxx --target-dir /user/cloudera/3_batch_ingest/sqoop_orderitems_parquet --delete-target-dir --as-parquetfile</code>

## Lab 3.4: Hive Access

<code>beeline
!connect jdbc:hive2://localhost:10000 xxx
show databases;
showw tables;
describe tablename;</code>

## Hive Data Model - Partitioning

<code>CREATE_TABLE Sales (sale_id INT, customer_id INT, amount FLOAT)
PARTITIONED BY (country STRING, year INT, month INT)</code>

## Impala
<code>create table student (id int, number int);</code>

<code>insert into student values (1,10), (2, 20)</code>

Hive vs. Impala Comparison:
https://www.dezyre.com/article/impala-vs-hive-difference-between-sql-on-hadoop-components/180

Hive Partitioning & Clustering Performance: 
https://blog.cloudera.com/blog/2017/12/faster-performance-for-selective-queries/

## Lab 3.5: Manual Data Ingestion
<code>beeline
!connect jdbc:hive2://localhost:10000 xxx</code>

- Generation of table: <code>CREATE TABLE my_table(product STRING, inventory INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE;</code>
- Insert data manually: <code>INSERT INTO my_table VALUES ('A', 10), ('B', 20);</code>
- Insert data from a file: <code>LOAD DATA INPATH '/user/cloudera/3_batch_ingest/manual_ingestion/inventory.txt' INTO TABLE my_table;</table>

## Lab 3.6: Hive with Import

<code>sqoop import --table order_items --connect jdbc:mysql://localhost:3306/retail_db --username=xxx --password=xxx --warehouse-dir=/user/hive/warehouse --hive-import --as-parquetfile --hive-overwrite</code>

## Lab 3.7: Hive with Partitioning

- Dynamic partitioning: <code>set hive.exec.dynamic.partition.mode=nonstrict</code>
- Insert data into partitioned table: <code>insert overwrite table order_items_partitioned partition(order_item_product_id) select order_item_id, order_item_order_id, order_item_product_price, order_item_quantity, order_item_subtotal, order_item_product_id from order_items</code>

## Lab 3.8:  SF Police Incidents use case
### Preparation

- Add jar-file for SerDe: <code>ADD JAR /usr/lib/hive/lib/opencsv-2.3.jar;</code>
- Create table: <code>CREATE TABLE sf_incidents (IncidntNum int, Category string, Descript string, DayOfWeek string, dDate date, Ttime string, PdDistrict string, Resolution string, Address string, x DECIMAL(9, 6), y DECIMAL(9, 6), LLocation string, PdId string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ("separatorChar" = "\\,", "quoteChar" = "\\"", "escapeChar" = "\\\\") STORED AS TEXTFILE;</code>
- Load data: <code>LOAD DATA INPATH '/user/cloudera/3_batch_ingest/sf_crime/Police_Department_Incidents.csv' INTO TABLE sf_incidents;</code>

### Queries

- <code>SELECT Category, count(Category) from sf_incidents group by Category;</code>
- <code>SELECT Category, DayOfWeek, count(Category) from sf_incidents where category= "LIQUOR LAWS" group by category, DayOfWeek;</code>

## Lab 3.9: Import all order tables with sqoop

Drop tables we created so far:
<code>!connect jdbc:hive2://localhost:10000 xxx
drop table if exists order_items;
drop table if exists categories;
drop table if exists customers;</code>

Import whole dataset from MySQL:
<code>sqoop import-all-tables -m 1 --connect jdbc:mysql://quickstart:3306/retail_db --username=xxx --password=xxx --compression-codec=snappy --as-parquetfile --warehouse-dir=/user/hive/warehouse --hive-import</code>

<hr>

# Lecture 4 - Batch Data Processing

## PySpark Example (Slide 22)

In [None]:
from operator import add  # Spark's implementation of adding two numbers

# our inpurt RDD
rdd = sc.parallelize([('NY',10),('NY',20),('NJ',15),('NY',19), ('NJ', 10)])

# build the sum per city
rdd.reduceByKey(add).collect()


# make it a bit more complex: first map city names, then build the sum
def convert(kv):
    mapping = {'NY':'New York', 'NJ':'New Jersey'}
    return (mapping[kv[0]], kv[1])

rdd.map(convert).reduceByKey(add).collect()


In [None]:
# another example
df = sqlContext.read.load("gs://xxx/orders.csv", format="com.databricks.spark.csv", header="true", inferSchema="true")

# get the first five rows
df.take(5)

## Lab 4.1 - Spark & Scala on the Cloudera Machine
<code>// create an RDD from an HDFS file
val file = sc.textFile("hdfs://localhost/user/cloudera/4_batch_process/1_input_wordcount/*")
// transform the RDD: one line for each word 
val words = file.flatMap(line => line.split(" "))
// execute action: count all words (evaluation starts afterwards)
val number_of_words = words.count()
// transform the RDD: (1) prepare for wordcount (2) reduce by key and sum occurences
val occurences_per_word = words.map(word => (word, 1)).reduceByKey(_ + _)
// show the occurences (these are both actions)
occurences_per_word.collect()
occurences_per_word.saveAsTextFile("hdfs://localhost/user/cloudera/4_batch_process/output_wordcount")</code>

## Lab 4.2 - Spark & Scala on a DataProc Cluster
### Using the Web SSH-tool
<code>// prepare for Spark Dataset usage
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Spark Test HDM M9").getOrCreate()
import spark.implicits._

// read the csv-File with the order items
val df = spark.read.option("header", "true") .option("inferSchema", "true").csv("gs://spark_scala/order_items.csv")
// look at the metadata
df.printSchema() 
df.head() // look at a sample (first row)</code>
<code>// show the order_id column
df.select("order_id").show() 
// show two columns
df.select("order_item_id", "order_id").show()
// group by the order_id and count the number of line items
df.groupBy("order_id").count().show()
// show aggregates per product
df.groupBy("product_id").sum("quantity", "subtotal").show()
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("order_items")
val sqlDF = spark.sql("SELECT * FROM order_items WHERE product_id=5")
sqlDF.show()
</code>

## Lab 4.3 - GCP with PySpark and Jupyter Notebooks (Alternative 1: local SSH tunnel)
This is the preffered method, also for the project.<br>Recommended WiFi: eduroam<br><br>
Video showing the process for Mac: https://www.youtube.com/watch?v=EdNWgZ4cOWo 

### Step 1: DataProc setup - Initialization Action for JupyterHub
Create a cluster with the following initialization action (within advanced settings):

<code>gs://dataproc-initialization-actions/jupyter/jupyter.sh</code>

### Step 2: Establish SSH connection to the master node

We now want to establish a secure connection to our master node via SSH. We will not use this connection to prompt commands (like in the web shell we used), but only use it as a "tunnel" to reach the cluster network securely and especially within this network the master node with Jupyter running.

<font color="red"><b>Open a terminal on your Laptop (shell/Eingabeaufforderung/...)!</b></font>

Initialize your gcloud SDK:<br>
<code>gcloud init</code>
<br><br>
Set your default project:<br>
<code>gcloud config set project project_id</code>
<br><i>Note: project_id is the project ID - needs to be replaced with your project</i>
<br><br>
Create an ssh connection to the master node:<br>
<code>gcloud compute ssh --zone=europe-west1-b --ssh-flag="-D" --ssh-flag="10000" --ssh-flag="-N" "cluster_name-m"</code>
<br><i>Note: cluster_name is the name of your DataProc cluster</i>
<br><br>
<font color="red"><b>Do not close the terminal (and on Windows also not the Putty window)!</b></font>

### Step 3: Open a browser with SSH-Tunnel as proxy and within this Jupyter

<font color="red"><b>Now open a second terminal/shell on your laptop!</b></font>
<br>We now want to open a browser that uses the SSH connection as a proxy (i.e. "tunnels" all traffic through this connection such that we can reach Jupyter running on our master-node.

#### Generic command
<code>"browser executable path" "http://cluster_name-m:8123" --proxy-server="socks5://localhost:10000" --host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" --user-data-dir=C:\temp</code>

#### On a Windows host with chrome and pyspark as DataProc cluster name

https://cloud.google.com/dataproc/docs/tutorials/jupyter-notebook

<code>"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe" "http://cluster_name-m:8123" --proxy-server="socks5://localhost:10000" --host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" --user-data-dir=c:\Temp</code>

#### On a Mac host with chrome
<code>/Applications/Google\\ Chrome.app/Contents/MacOS/Google\\ Chrome --proxy-server="socks5://localhost:10000" --user-data-dir=/tmp/${HOSTNAME}</code>


### Step 4: Open Jupyter (on Windows automatically openend)
URL in opened browser window: http://cluster_name-m:8123

## Lab 4.3 - GCP with PySpark and Jupyter Notebooks (Alternative 2: cloud shell)
### Step 1: Create a cluster
See above 
### Step 2: Open the cloud shell
Enter this command to establish an SSH connection from cloud shell to your master node:<br>
<code>gcloud compute ssh cluster_name-m --project=project_id --zone=europe-west1-b -- -4 -N -L 8080:cluster_name-m:8123</code>
### Step 3: Open browser preview
Within the cloud shell you'll find a button "Preview on port 8080" - this should open another browser window with the Jupyter connection.

## Lab 4.3 - GCP with PySpark and Jupyter Notebooks (Alternative 3: HdM JupyterHub)

### Step 1: Create cluster
See above
### Step 2: establish a tunnel to our JupyterHub virtual machine
You need to set up a tunnel in Putty: go to Connection->SSH->Tunnels and enter <code>10000</code> as "source port" and in the "destination" field <code>localhost:10000</code>. Hit "Add" and save the connection.
<br><br>On a Mac you can use <code>ssh xxx@xxx -p 60022 -L 10000:localhost:10000</code>
### Step 3: Open a browser using the tunnel
See steps 3 and 4 above.

### Import packages with pip
Install pandas from a cell like this:<br>
<code>import sys
!{sys.executable} -m pip install pandas</code>

### Code of the Lab

In [None]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sqlContext = SQLContext(sc)

df = sqlContext.read.load("gs://pyspark_example/order_items.csv",
                         format="com.databricks.spark.csv",
                         header="true",
                         inferSchema="true")

# how many distinct products
df.select("product_id").distinct().collect()

# how many line items per product
df.groupBy("product_id").count().collect()

# read the orders file
df_orders = sqlContext.read.load("gs://pyspark_example/orders.csv",
                         format="com.databricks.spark.csv",
                         header="true",
                         inferSchema="true")

# show all possible order states
df_orders.select("order_status").distinct().collect()
# calculate open order items per status and customer (without closer and cancelled orders)
open_order_items = df.join(df_orders.filter((df_orders.order_status!='CLOSED') & (df_orders.order_status!='CANCELED')), 
                                     df.order_id == df_orders.order_id)

open_order_items.groupBy("customer_id", "order_status").count().collect()


# write back results as a view (lambda architecture)
result_df = open_order_items.groupBy("customer_id", "order_status").count()
result_df.write.save("gs://pyspark_example/results/", format='csv')

# show a histogram of line items per order
pandas_df = df.groupBy("order_id").count().withColumnRenamed('count', 'no_lines').groupBy('no_lines').count().toPandas()

pandas_df.set_index('no_lines')

pandas_df['count'].plot(kind='bar')
plt.show()

## Lab 4.4 - RapidMiner with Spark  & Hadoop
Add this to the hosts file (Windows: C:\Windows\System32\drivers\etc\hosts, Mac: /etc/hosts):<br>
<code>xxx quickstart.cloudera</code>
- NameNode Address: <code>quickstart.cloudera</code>
- Advanced Parameters: <code>dfs.client.use.datanode.hostname</code> - set to true
- Assembly Jar Location: <code>hdfs:///user/cloudera/spark-assembly.jar</code>
- Hive Server Address: <code>quickstart.cloudera</code> (or xxx)
 - Username: xxx
 - Password: xxx

<hr>

# Lecture 5 - Data Extraction and Query Languages

## Lab 5.5 - View Generation with sqoop
1. <code>mysql -u xxx -p -D xxx
CREATE TABLE order_statistics (order_customer_id BIGINT, order_status VARCHAR(200), sum_order_item_quantity BIGINT, count_order_id BIGINT);</code>
2. <code>sqoop export --connect jdbc:mysql://localhost/retail_db --driver com.mysql.jdbc.Driver --username xxx --password xxx --table order_statistics --hcatalog-table order_statistics</code>
3. <code>select * from order_statistics;</code>

## Lab 5.6 - View Generation in GCP
Open the cloud shell and prepare the Police Departments Dataset (unzip and upload back to bucket) 
- <code>gsutil cp gs://sf_crime/Police_Department_Incidents.zip</code>
- <code>unzip Police_Department_Incidents.zip</code>
- <code>gsutil cp Police_Department_Incidents.csv gs://sf_crime/</code>
- <code>rm ./Police_Department_Incidents</code>
- <code>remove the zip file from the bucket</code>

<hr>

# Lecture 6 - Stream Processing

## Lab 6.1 - Kafka producer / consumer
SSH into our ubuntu machine 
- IP: xxx
- Port: 60022
- User: xxx
- Password: provided in lecture

Create a console producer
1. Go to: <code>cd /opt/kafka/kafka_2.12-2.1.0/bin</code>
2. Create a topic: <code>kafka-topics --zookeeper xxx:2181 --create --topic hdmm9 --partitions 1 --replication-factor 1</code>
3. Create a “console producer”: <code>kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic</code>

Open another shell (see above)<br>
Create a consumer: <code>kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic</code>

## Lab 6.3 - GCP PubSub
Also see the templates folder in JupyterHub (/notebooks/template/pubsub).

1. Subscribe to the NY-Taxi stream: <code>gcloud pubsub subscriptions create test --topic projects/pubsub-public-data/topics/taxirides-realtime</code>
2. Pull streaming data:<code>gcloud pubsub subscriptions pull projects/your project name/subscriptions/test</code>

## Lab 6.3b - GCP PubSub with Python
See the templates folder in JupyterHub (/notebooks/template/pubsub).

## Lab 6.4 - Spark Streaming with Scala
<code>import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger.setLevel(Level.WARN)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.socketTextStream("localhost", 30999, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()</code>

## Lab 6.4b - Spark Streaming with PySpark (Shell)
Enter this code in a pyspark shell

In [None]:
import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from uuid import uuid1

ssc = StreamingContext(sc, 10) # 10 second window

kvs = KafkaUtils.createStream(ssc, "xxx:2181", "raw-event-streaming-consumer",{"mytopic":1})  # set my_topic to a Kafka topic to which you can publis

lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()

## Lab 6.4c - Spark Streaming with PySpark (Job-Execution)
Enter this code in a file called <code>pyspark_streaming_kafka.py</code>

In [None]:
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
        sc = SparkContext(appName="PythonStreamingKafkaWordCount")
        ssc = StreamingContext(sc, 10) # 10 second window

        kvs = KafkaUtils.createStream(ssc, "xxx:2181", "raw-event-streaming-consumer",{"my_topic":1})  # set my_topic to a Kafka topic to which you can publis

        lines = kvs.map(lambda x: x[1])
        counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
        counts.pprint()

        ssc.start()
        ssc.awaitTermination()

Submit this file as a job to Spark:
<code>spark-submit --master=yarn pyspark_streaming_kafka.py</code>

Push some messages to the topic my_topic

## Lab 6.4d - Spark Structured Streaming with PySpark (Spark v1.6)

In [None]:
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
from pyspark.sql import Row

def getSqlContextInstance(sparkContext):
    """ Helper function to get the spark sql context (newer versions don't need this)"""
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
    return globals()['sqlContextSingletonInstance']

def process(time, rdd):
    """ process the rdd (of each window) based on structured streaming """
    print("========= %s =========" % str(time)) # print the time of the window
    try:
        sqlContext = getSqlContextInstance(rdd.context)  # call helper function
        rowRdd = rdd.map(lambda w: Row(word=w)) # create a Row object for each line of the windows input
        wordsDataFrame = sqlContext.createDataFrame(rowRdd)  # create a DataFrame (-> StructuredStreaming)
        wordsDataFrame.registerTempTable("words") # register temp table
        wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") # count words
        wordCountsDataFrame.show() # show the result
    except Exception as err:
        print(err)


ssc = StreamingContext(sc, 5) # 5 second window
kvs = KafkaUtils.createStream(ssc, "141.62.117.222:2181", "raw-event-streaming-consumer",{"my_topic":1}) # listen to kafka topic
lines = kvs.map(lambda x: x[1]) # a kafka message is a tuple (key, value) - we want to analyze the value
lines.foreachRDD(process) # we "register" the process method (see above) to be executed for each incoming rdd (i.e. per window)
ssc.start()  # start structured streaming
ssc.awaitTermination()  # wait for user termination (Ctrl-c) to stop stream processing


<hr>

# Lecture 7 - NoSQL

## Lab 7.4 - MongoDB
See the templates folder in JupyterHub (/notebooks/template/mongodb).