# Dados de Entrada
* Selecione "Adicionar ao Drive"
* Links: 
  * https://tinyurl.com/bigdata-gut-pt
  * https://tinyurl.com/bigdata-amz





# Setup

## Instalação de pacotes

In [None]:
!apt-get update > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!apt-get install netcat
!wget -q https://downloads.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
!tar xf spark-3.2.4-bin-hadoop3.2.tgz
!pip install findspark pyspark 

## Acesso ao Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Preparação do ambiente

In [None]:
%env PYTHONHASHSEED=1234
%env JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
%env SPARK_HOME=/content/spark-3.2.4-bin-hadoop3.2

In [None]:
import findspark
findspark.init("/content/spark-3.2.4-bin-hadoop3.2")

In [None]:
findspark.find()


# Streaming de arquivos

## Streaming Básico

In [None]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext


In [None]:
conf = SparkConf().setAppName('Big Data Streaming').setMaster('local[*]')
sc = SparkContext.getOrCreate(conf)

streaming = StreamingContext(sc, 1)

In [None]:
!mkdir /content/streaming_test

In [None]:
fstream = streaming.textFileStream('/content/streaming_test')



In [None]:
import re

def words_in_line(line) :
  line = re.sub('[^a-zà-ù ]', ' ', line.lower())
  nwords = len(line.split())
  return (None, nwords)


In [None]:
wc = fstream.map(words_in_line).reduceByKey(lambda acc, v: acc + v)


In [None]:
wc.pprint()

In [None]:
!rm -rf /content/streaming_test/*

Após iniciar o streaming, você deve copiar alguns arquivos para o diretório streaming_test

In [None]:
streaming.start()             
query = streaming.awaitTermination(120)  

In [None]:
sc.stop()
streaming.stop()

## Processamento para cada RDD no Stream

In [None]:
conf = SparkConf().setAppName('Big Data Streaming').setMaster('local[*]')
sc = SparkContext.getOrCreate(conf)

streaming = StreamingContext(sc, 5)

In [None]:
fstream = streaming.textFileStream('/content/streaming_test')


In [None]:
!rm /content/streaming_test/*

In [None]:
import re

def split_words(line) :
  line = re.sub('[^a-zà-ù ]', ' ', line.lower())
  words = line.split()
  for w in words :
    yield (w, 1)


In [None]:
wc = fstream.flatMap(split_words) \
            .reduceByKey(lambda acc, v: acc + v)

In [None]:
wc.pprint()

In [None]:
all_res = []

In [None]:
def process_rdd(time, rdd):
  global all_res
  if not rdd.isEmpty():
    rdd_sorted = rdd.sortBy(lambda pair: pair[1], ascending=False)
    all_res.append(rdd_sorted.take(20))


In [None]:
wc.foreachRDD(process_rdd)

In [None]:
!rm -rf /content/streaming_test/*

In [None]:
streaming.start()             
query = streaming.awaitTermination(120)  

In [None]:
all_res

In [None]:
all_res_merged = sum(all_res, [])

In [None]:
all_res_merged

In [None]:
merged_rdd = sc.parallelize(all_res_merged)

In [None]:
merged_rdd

In [None]:
total_rdd = merged_rdd.reduceByKey(lambda acc, v : v+acc)

In [None]:
total_rdd.take(10)

In [None]:
sc.stop()
streaming.stop()

# Streaming via Rede

## Revisão Sobre sockets Client-Server



In [None]:
import socket


In [None]:
def server() :
  s = socket.socket()
  s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  print("Socket Criado no Servidor")
  s.bind(('localhost',9998))
  s.listen(3)
  print("Esperando Conexões")

  while True:
    c, addr = s.accept()
    name = c.recv(1024).decode()
    print("Cliente ",addr, " falou: ", name)
    c.send(bytes('Eu sou o Servidor','utf-8'))
    c.close()

In [None]:
def client() :
  c = socket.socket()
  c.connect(('localhost', 9998))
  name = "Eu sou o Cliente"
  c.send(bytes(name,'utf-8'))
  print('Servidor falou: '+c.recv(1024).decode())

In [None]:
from multiprocessing import Process

sp = Process(target=server)
sp.start()



In [None]:
cp = Process(target=client)
cp.start()


In [None]:
cp = Process(target=client)
cp.start()

In [None]:
sp.terminate()

In [None]:
cp.join()
sp.join()


## Envio de um stream de dados


In [None]:
import socket
import time

In [None]:
def server_streaming() :
  s = socket.socket()
  s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  print("Socket Criado no Servidor")
  s.bind(('localhost',9998))
  s.listen(3)
  print("Esperando Conexões")

  filename = '/content/drive/My Drive/amz/small.csv'
  with open(filename) as file:
    content = file.readlines()

  NLINES = 600

  while True:
    c, addr = s.accept()
    print("Conectado com ",addr)
    for i in range(NLINES) :
      c.send(bytes(content[i],'utf-8'))
      time.sleep(0.1)
    c.close()

In [None]:
from multiprocessing import Process

sp = Process(target=server_streaming)
sp.start()



In [None]:
!nc localhost 9998 

In [None]:
sp.terminate()
sp.join()


# Spark Streaming via Rede

In [None]:
from multiprocessing import Process

sp = Process(target=server_streaming)
sp.start()

In [None]:
conf = SparkConf().setAppName('Big Data Streaming').setMaster('local[*]')
sc = SparkContext.getOrCreate(conf)

streaming = StreamingContext(sc, 10)

In [None]:
netstream = streaming.socketTextStream("localhost", 9998)

In [None]:
prod_rev = netstream.map(lambda line: (line.split(','))) \
              .map(lambda line: (line[0], (float(line[2]), 1))) \
              .reduceByKey(lambda acc, v: (acc[0] + v[0], acc[1] + v[1])) \
              .mapValues(lambda v: v[0]/v[1])


In [None]:
prod_rev.pprint()

In [None]:
streaming.start()             
query = streaming.awaitTermination(60)  

In [None]:
sc.stop()
streaming.stop()

In [None]:
sp.terminate()
sp.join()
