# Hadoop, Map-Reduce and Spark demo

In [None]:
import os
import re
import json
import socket
import subprocess
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import utils as pu
from pyspark.sql import functions as F
from pyspark.sql import types as pt

In [None]:
WORK_DIR = '/jovyan'

## HDFS operations

In [None]:
def hdfs_dirs(path, filter_str=''):
    process = subprocess.Popen(
        ['hdfs', 'dfs', '-ls', path], 
        stdout=subprocess.PIPE, 
        stderr=subprocess.PIPE
    )
    out, err = process.communicate()
    dirs = out.decode('utf-8').split('\n')
    dirs = list(filter(lambda x: filter_str in x, dirs))
    dirs = list(map(lambda x: x.split(' ')[-1], dirs))
    return dirs

def file_content(path):
    process = subprocess.Popen(
        ['hdfs', 'dfs', '-cat', path], 
        stdout=subprocess.PIPE, 
        stderr=subprocess.PIPE
    )
    out, err = process.communicate()
    return out.decode('unicode_escape')

In [None]:
!hdfs dfs -ls /

In [None]:
!hdfs dfs -ls {WORK_DIR}

In [None]:
!hdfs dfs -put ./data/telecom_churn.csv {WORK_DIR}

In [None]:
!hdfs dfs -ls {WORK_DIR}

In [None]:
hdfs_dirs(WORK_DIR, 'csv')

In [None]:
content = file_content(f'{WORK_DIR}/telecom_churn.csv')
print(content[:512])

## Spark

In [None]:
print('user:', os.environ['JUPYTERHUB_SERVICE_PREFIX'])

def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return '{}proxy/{}/jobs/'.format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)

SparkContext.uiWebUrl = property(uiWebUrl)

conf = SparkConf().set('spark.master', 'local[*]').set('spark.driver.memory', '4g')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark

In [None]:
sdf = spark.read.csv(
    f'{WORK_DIR}/telecom_churn.csv',
    sep=',', 
    header=True
)
sdf.printSchema()

In [None]:
sdf.limit(5).toPandas().head()

In [None]:
print('total rows in spark dataframe:', sdf.count())

## Map-reduce

In [None]:
%%bash

# Test Hadoop cluster by running wordcount task
# based on https://github.com/Segence/docker-hadoop
# by Rob Vadai https://twitter.com/robvadai

work_dir=/jovyan
temp_dir=tohdfs

# create input files
mkdir -p ${temp_dir}
echo "Er legt die Nadel auf die Ader Und bittet die Musik herein Zwischen Hals und Unterarm Die Melodie fährt leise ins Gebein" > ${temp_dir}/part1.txt
echo "Er hat die Augen zugemacht In seinem Blut tobt eine Schlacht Ein Heer marschiert durch seinen Darm Die Eingeweide werden langsam warm" > ${temp_dir}/part2.txt

# create input directory on HDFS
hadoop fs -mkdir -p ${work_dir}/input

# put input files to HDFS
hdfs dfs -put ./${temp_dir}/* ${work_dir}/input

# run wordcount
hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.2.jar wordcount ${work_dir}/input ${work_dir}/output

# print the input files
echo -e "\ninput file1.txt:"
hdfs dfs -cat ${work_dir}/input/part1.txt

echo -e "\ninput file2.txt:"
hdfs dfs -cat ${work_dir}/input/part2.txt

# print the output of wordcount
echo -e "\nwordcount output:"
hdfs dfs -cat ${work_dir}/output/part-r-00000