# Práctica Big Data Architecture

En este colab voy a desarrollar la parte práctica del ejercicio. Dado que Google cloud nos permite gestionar los recursos que necesitemos con su API he optado por automatizar todo el proceso. Así que en este colab se van a generar los ficheros a partir del [API de Stack Exchange](https://api.stackexchange.com/), se va a crear el cluster de Hadoop, se van a lanzar los jobs de Hadoop y Hive y se va a visualizar el resultado de estos jobs

Para realizar las llamadas al API de Stack Exchange usamos la librería de [Python Requests](http://docs.python-requests.org/en/master/)

Para realizar las operaciones sobre Google Cloud usamos la [librería que proporciona Google](https://developers.google.com/api-client-library/python/)

Ahora vamos a realizar los imports necesarios, inicializamos las variantes que identifican al cluster, ficheros, storage, etc.
Para poder realizar las tareas en Google Cloud se necestira un fichero de credenciales como se explica [en este enlace](https://cloud.google.com/docs/authentication/getting-started). Durante la ejecución se solicitará que se suba el fichero de credenciales. Es relativamente seguro porque se  sube a tu Google Drive.



In [1]:
import json
import requests

from datetime import datetime, timedelta
import time
import os

from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials

# damos permiso a google colab para poder usar gsutil y subir los ficheros generados a nuestro Google Storage
from google.colab import auth
auth.authenticate_user()

# Para ello hay que proporcionar un fichero de credenciales de Google cloud. 
# El siguiente pide el fichero de credenciales para subirlo a Google Drive
from google.colab import files
uploaded = files.upload()

# ID del proyecto de Google Cloud
project_id = "positive-record-228820"

# Nombre con en el que se va a crear el cluster de Hadoop
cluster_name = "cluster-bdarchitecture-stackoverflow"

# Zona donde se va a crear el cluster
zone = "europe-west4-a"

# Region donde se va a crear el cluster
region = "europe-west4"

# Nombre del bucket de Google Storage donde se van a almacenar los ficheros
bucket_name = "dataproc-d3c1f40b-c187-47f1-a030-87fd5670b613-europe-north1"
os.environ['BUCKET_NAME'] = bucket_name

# IMPORTANTE!!!!  Este nombre de fichero debe ser el del fichero subido en el paso anterior
credentials_file = 'BD-Architecture-157d0541f3f7.json'


Saving BD-Architecture-157d0541f3f7.json to BD-Architecture-157d0541f3f7.json


## Spring 2

Vamos a generar dos ficheros con el API que proporciona Stack Exchange. 

En el primer fichero vamos a obtener todos los usuarios que hayan respondido a una pregunta en Stack Overflow en el último día. Opcionalmente se puede ampliar este periodo de tiempo, aunque conviene no subir mucho este número ya que el API nos limita a 300 peticiones al API durante 6 horas.

El segundo fichero, a partir de los IDs obtenidos en el primer fichero, vamos a obtener los nombres, reputación y pais de cada usuario. Ambos ficheros se almacenan en el bucket configurado

In [0]:
# obtenemos la fecha de hace 1 dia
d = datetime.today() - timedelta(days=1)

fromdate = int(d.timestamp())

url_base = "https://api.stackexchange.com/2.2/answers?&order=asc&sort=activity&site=stackoverflow&pagesize=100&fromdate=" + str(fromdate)
has_more = True
pagina = 1

with open('user_ids_answers', 'w') as f_user_ids_answers:    
    while(has_more) :
        url_request = url_base + "&page=" + str(pagina)
        response = requests.get(url_request)

        result = response.json()
        
        if (result.get('error_id')):
            print("Error: " + result.get('error_message'))
            break;

        
        for answer in result['items']:
            owner = answer['owner']
            if (owner.get('user_id')): # algunas peticiones no traen el user_id
              f_user_ids_answers.write(str(answer['owner']['user_id']) + "\n")

        print(end=".")

        has_more = result['has_more']
        pagina = pagina + 1
        time.sleep(1)

# Subimos el fichero al storage de Google Cloud
!gsutil cp user_ids_answers gs://$BUCKET_NAME

.............................

In [0]:
!rm user_ids_names
# ahora que user ID de los usurios que han comentado, obtemos el nombre de usuario de cada uno para luego cruzar los datos en HIVE
# con el nombre de usuario y el is generamos un fichero csv
with open('user_ids_answers', 'r') as f_user_ids_answers:
    # El API de stackexchange nos permite 
    # https://api.stackexchange.com/docs/users-by-ids

    i = 0
    users_url = ""
    for user_id in f_user_ids_answers:
        user_id = f_user_ids_answers.readline().rstrip()
        
        if (i >= 100):
            # quitamos el ultimo ; y hacemos la peticion para obtener los datos de los usuarios
            users_url = users_url[:-1]
            url = "https://api.stackexchange.com/2.2/users/" + users_url + "?pagesize=100&order=desc&sort=reputation&site=stackoverflow"
            #print(url)
            print(end=".")
            response = requests.get(url)
            result = response.json()

            with open('user_ids_names', 'a') as f_user_ids_names:
                if (result.get('error_id')):
                    print("Error: " + result.get('error_message'))
                else:    
                    for user in result['items']:
                        user_id = user['user_id']
                        name = user.get('display_name')
                        reputation = user.get('reputation')
                        location = user.get('location')
                        f_user_ids_names.write(str(user_id) + "," + name + "," + str(reputation) + "," + str(location) + "\n")

            i = 0
            users_url = ""
        
        users_url = users_url + str(user_id) + ";"
        i = i + 1



#!cat user_ids_answers
#!pwd
#!ls
#!cat user_ids_answers
#from google.colab import files
#files.download('user_ids_answers')
#files.download('user_ids_names')

# Subimos el fichero al storage de Google Cloud
!gsutil cp user_ids_names gs://$BUCKET_NAME/

rm: cannot remove 'user_ids_names': No such file or directory
..............................................Copying file://user_ids_names [Content-Type=application/octet-stream]...
-
Operation completed over 1 objects/162.1 KiB.                                    


## Spring 3

En este paso creamos el cluster en Google cloud. El cluster va a constar de un nodo master y dos slaves. Dado que no necesitamos gran potencia de proceso he configurado el cluster con las instancias más pequeñas y un espacio en disco de 30GB cada una.

In [0]:
# Ahora vamos a crear el cluster en Google Cloud mediante el API de Python que proporciona Google.

def get_client():
  """Builds a client to the dataproc API."""
  dataproc = build('dataproc', 'v1', credentials=credentials)
  return dataproc

def create_cluster(dataproc, project, zone, region, cluster_name, bucket_name):
  print('Creating cluster...')
  zone_uri = \
    'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
      project, zone)
  cluster_data = {
    'projectId': project,
    'clusterName': cluster_name,
    'config': {
      "configBucket": bucket_name,
      'gceClusterConfig': {
        'zoneUri': zone_uri
      },
      'masterConfig': {
        'numInstances': 1,
        'machineTypeUri': 'n1-standard-1',
        'diskConfig': {
          "bootDiskType": "pd-standard",
          "bootDiskSizeGb": 30,
          "numLocalSsds": 0
        }
      },
      'workerConfig': {
        'numInstances': 2,
        'machineTypeUri': 'n1-standard-1',
        "diskConfig": {
          "bootDiskType": "pd-standard",
          "bootDiskSizeGb": 30,
          "numLocalSsds": 0
        }
      },
      "softwareConfig": {
        "imageVersion": "1.3-deb9"
      },
      "secondaryWorkerConfig": {
        "numInstances": 0,
        "isPreemptible": True
      }
    }
  }
  result = dataproc.projects().regions().clusters().create(
    projectId=project,
    region=region,
    body=cluster_data).execute()
  return result


def list_clusters(dataproc, project, region):
  result = dataproc.projects().regions().clusters().list(
    projectId=project,
    region=region).execute()
  return result

credentials = GoogleCredentials.from_stream(credentials_file)

dataproc = get_client()
create_cluster(dataproc, project_id, zone, region, cluster_name, bucket_name)


Creating cluster...


{'metadata': {'@type': 'type.googleapis.com/google.cloud.dataproc.v1.ClusterOperationMetadata',
  'clusterName': 'cluster-bdarchitecture-stackoverflow',
  'clusterUuid': '364ad0fc-807e-4906-be34-c4accb397daa',
  'description': 'Create cluster with 2 workers',
  'operationType': 'CREATE',
  'status': {'innerState': 'PENDING',
   'state': 'PENDING',
   'stateStartTime': '2019-01-24T18:09:42.258Z'},
 'name': 'projects/positive-record-228820/regions/europe-west4/operations/dbbb670f-7c91-494b-ace8-100268a72a7d'}

## Spring 4

Lanzamos un trabajo al cluster hadoop que ejecuta un WordCount sobre el fichero con los IDs de los  usuarios que han realizado alguna respuesta en el último día. Así cuantas veces ha respondido cada usuario.



In [0]:
# En este paso esperamos a que se cree el culster y despues lanzamos un job para que se haga el wordcount del fichero de IDS
def submit_haddop_job(dataproc, project, region,
                       cluster_name, bucket_name, filename):
  job_details = {
    'projectId': project,
    'job': {
      'placement': {
        'clusterName': cluster_name
      },
      'hadoopJob': {
        "args": ["wordcount", 'gs://{}/{}'.format(bucket_name, filename), "output"],
        "mainJarFileUri": "file:////usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
      }
    }
  }
  result = dataproc.projects().regions().jobs().submit(
    projectId=project,
    region=region,
    body=job_details).execute()
  job_id = result['reference']['jobId']
  print('Submitted job ID {}'.format(job_id))
  return job_id

def wait_for_job(dataproc, project, region, job_id):
  print('Waiting for job to finish...')
  while True:
    result = dataproc.projects().regions().jobs().get(
    projectId=project,
    region=region,
    jobId=job_id).execute()
    # Handle exceptions
    if result['status']['state'] == 'ERROR':
      raise Exception(result['status']['details'])
    elif result['status']['state'] == 'DONE':
      print('Job finished.')
      return result

while True:
  print("waiting cluster creation") 
  clusters = list_clusters(dataproc, project_id, region)

  cluster_running = False
  for cluster in clusters.get('clusters'):
    #print(cluster['clusterName'])
    print("CLUSTER STATE: " + cluster['status']['state'])
    if cluster['clusterName'] == cluster_name and cluster['status']['state'] == 'RUNNING':
      cluster_running = True

  if cluster_running:
    break
  time.sleep(10)
  
job_id = submit_haddop_job(dataproc, project_id, region, cluster_name, bucket_name, "user_ids_answers")
## esperamos a que termine el job  
wait_for_job(dataproc, project_id, region, job_id)




CREATING
CREATING
CREATING
CREATING
CREATING
CREATING
CREATING
CREATING
CREATING
CREATING
CREATING
CREATING
CREATING
CREATING
CREATING
RUNNING
Submitted job ID d6e0e4b2-016c-438b-b949-b42e46b334ce
Waiting for job to finish...
Job finished.


{'driverControlFilesUri': 'gs://dataproc-d3c1f40b-c187-47f1-a030-87fd5670b613-europe-north1/google-cloud-dataproc-metainfo/364ad0fc-807e-4906-be34-c4accb397daa/jobs/d6e0e4b2-016c-438b-b949-b42e46b334ce/',
 'driverOutputResourceUri': 'gs://dataproc-d3c1f40b-c187-47f1-a030-87fd5670b613-europe-north1/google-cloud-dataproc-metainfo/364ad0fc-807e-4906-be34-c4accb397daa/jobs/d6e0e4b2-016c-438b-b949-b42e46b334ce/driveroutput',
 'hadoopJob': {'args': ['wordcount',
   'gs://dataproc-d3c1f40b-c187-47f1-a030-87fd5670b613-europe-north1/user_ids_answers',
   'output'],
  'mainJarFileUri': 'file:////usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'},
 'jobUuid': 'd6e0e4b2-016c-438b-b949-b42e46b334ce',
 'placement': {'clusterName': 'cluster-bdarchitecture-stackoverflow',
  'clusterUuid': '364ad0fc-807e-4906-be34-c4accb397daa'},
 'reference': {'jobId': 'd6e0e4b2-016c-438b-b949-b42e46b334ce',
  'projectId': 'positive-record-228820'},
 'status': {'state': 'DONE', 'stateStartTime': '2019-01-24T18:1

## Spring 5

Lanzamos un trabajo en hive que realiza las siguientes acciones:

* Creamos la tabla user_answers con el contenido del fichero generado en hdfs por el trabajo de WordCount
* Creamos la tabla users con el contenido del fichero que tiene el id del usuario, nombre, reputación y localización.
* Creamos la tabla externa users_most_actives que generará un archivo en el bucket de Storage con los usuarios más activos.
* Cargamos la tabla users_most_actives con una consulta en la que obtenemos los usuarios, con su id, su nombre y el número de respuestas que han realizado.
* Creamos la tabla externa locations_most_actives que generará un archivo en el bucket de Storage con las localizaciones con los usuarios más activos.
* Cargamos la tabla locations_most_actives con una consulta en la que obtenemos las localizaciones y el número de respuestas que han realizado los usuarios de cada localización.


In [0]:
!gsutil cp user_ids_names gs://$BUCKET_NAME/
## realizamos la consultas de hive
def submit_hive_job(dataproc, project, region,
                       cluster_name, querys):
  job_details = {
    'projectId': project,
    'job': {
      'placement': {
        'clusterName': cluster_name
      },
      'hiveJob': {
        "queryList": {
            "queries": querys
        }
      }
    }
  }
  #["CREATE TABLE IF NOT EXISTS users\n (user_id INT, name STRING, reputation INT, location STRING)\n row format delimited fields terminated by ',';\n\nLOAD DATA INPATH 'gs://dataproc-d3c1f40b-c187-47f1-a030-87fd5670b613-europe-north1/user_ids_names' INTO TABLE users;\n\nCREATE TABLE IF NOT EXISTS user_answers\n (user_id INT, n_answers INT) row format delimited fields terminated by '\\t';\n\nLOAD DATA INPATH 'hdfs:///user/root/output/*' INTO TABLE user_answers;\n\nCREATE EXTERNAL TABLE IF NOT EXISTS users_most_actives(\n user_id INT, name STRING, n_answers INT)\n ROW FORMAT DELIMITED\n FIELDS TERMINATED BY ','\n STORED AS TEXTFILE\n LOCATION 'gs://dataproc-d3c1f40b-c187-47f1-a030-87fd5670b613-europe-north1/users_most_actives';\n\nINSERT OVERWRITE TABLE users_most_actives SELECT DISTINCT users.user_id, users.name, user_answers.n_answers \nFROM users JOIN user_answers ON users.user_id = user_answers.user_id \nORDER BY n_answers DESC;"]
  result = dataproc.projects().regions().jobs().submit(
    projectId=project,
    region=region,
    body=job_details).execute()
  job_id = result['reference']['jobId']
  print('Submitted job ID {}'.format(job_id))
  return job_id


# preparamos las querys
sql_hive = []
sql_hive.append("CREATE TABLE IF NOT EXISTS users \
(user_id INT, name STRING, reputation INT, location STRING) \
row format delimited fields terminated by ',';")

sql_hive.append("LOAD DATA INPATH 'gs://{}/user_ids_names' INTO TABLE users;".format(bucket_name))

sql_hive.append("CREATE TABLE IF NOT EXISTS user_answers\
(user_id INT, n_answers INT) row format delimited fields terminated by '\t';")

sql_hive.append("LOAD DATA INPATH 'hdfs:///user/root/output/*' INTO TABLE user_answers;")

sql_hive.append("CREATE EXTERNAL TABLE IF NOT EXISTS users_most_actives(\
user_id INT, name STRING, n_answers INT) \
ROW FORMAT DELIMITED \
FIELDS TERMINATED BY ',' \
STORED AS TEXTFILE \
LOCATION 'gs://{}/users_most_actives';".format(bucket_name))

sql_hive.append("INSERT OVERWRITE TABLE users_most_actives SELECT DISTINCT users.user_id, users.name, user_answers.n_answers  \
FROM users JOIN user_answers ON users.user_id = user_answers.user_id  \
ORDER BY n_answers DESC;")

sql_hive.append("CREATE EXTERNAL TABLE IF NOT EXISTS localtions_most_actives( \
location STRING, n_answers INT) \
ROW FORMAT DELIMITED \
FIELDS TERMINATED BY ',' \
STORED AS TEXTFILE \
LOCATION 'gs://{}/locations_most_actives';".format(bucket_name))

sql_hive.append("INSERT OVERWRITE TABLE localtions_most_actives SELECT location, SUM(user_answers.n_answers) TOTAL \
FROM users JOIN user_answers ON users.user_id = user_answers.user_id  \
GROUP BY location \
ORDER BY TOTAL DESC;")

job_id = submit_hive_job(dataproc, project_id, region, cluster_name, sql_hive)
wait_for_job(dataproc, project_id, region, job_id)

Copying file://user_ids_names [Content-Type=application/octet-stream]...
/ [1 files][162.1 KiB/162.1 KiB]                                                
Operation completed over 1 objects/162.1 KiB.                                    
Submitted job ID 0904075e-78fa-4208-ab38-c8205428ed3b
Waiting for job to finish...
Job finished.


{'driverControlFilesUri': 'gs://dataproc-d3c1f40b-c187-47f1-a030-87fd5670b613-europe-north1/google-cloud-dataproc-metainfo/364ad0fc-807e-4906-be34-c4accb397daa/jobs/0904075e-78fa-4208-ab38-c8205428ed3b/',
 'driverOutputResourceUri': 'gs://dataproc-d3c1f40b-c187-47f1-a030-87fd5670b613-europe-north1/google-cloud-dataproc-metainfo/364ad0fc-807e-4906-be34-c4accb397daa/jobs/0904075e-78fa-4208-ab38-c8205428ed3b/driveroutput',
 'hiveJob': {'queryList': {'queries': ["CREATE TABLE IF NOT EXISTS users (user_id INT, name STRING, reputation INT, location STRING) row format delimited fields terminated by ',';",
    "LOAD DATA INPATH 'gs://dataproc-d3c1f40b-c187-47f1-a030-87fd5670b613-europe-north1/user_ids_names' INTO TABLE users;",
    "CREATE TABLE IF NOT EXISTS user_answers(user_id INT, n_answers INT) row format delimited fields terminated by '\t';",
    "LOAD DATA INPATH 'hdfs:///user/root/output/*' INTO TABLE user_answers;",
    "CREATE EXTERNAL TABLE IF NOT EXISTS users_most_actives(user_id I

## Consulta de resultados. 

Primero eliminamos el cluster

Luego nos traemos los archivos generado en hive y consultamos los resultados. 

Primero consultamos los usuarios mas activos.

In [0]:
!gsutil cp -r gs://$BUCKET_NAME/users_most_actives .
!cat users_most_actives/*

Copying gs://dataproc-d3c1f40b-c187-47f1-a030-87fd5670b613-europe-north1/users_most_actives/000000_0...
/ [1 files][ 73.2 KiB/ 73.2 KiB]                                                
Operation completed over 1 objects/73.2 KiB.                                     
1144035,Gordon Linoff,28
9209546,jpp,26
1118978,Simonare,23
1447675,Nina Scholz,19
6766919,Ashay Mandwarya,15
3528136,Eric Svitok,15
1691980,Bruno Caceiro,14
9332897,Nikola Gavric,14
10611983,Tomothy32,13
2901002,jezrael,13
2308683,cricket_007,13
10955263,04FS,13
654031,Chris Pratt,12
10953560,Chien Nguyen,12
8620333,Temani Afif,12
7964527,W-B,12
10953776,anand_v.singh,11
209103,Frank van Puffelen,11
2622292,SiddAjmera,11
3044560,Umair Farooq,11
3832970,Wiktor Stribiżew,11
2549110,Gauravsa,10
5529445,Yogesh Sharma,10
10938799,IAmNerd2000,10
4279120,olinox14,10
1419590,Saraband,10
1491895,Barmar,10
5648954,Nick Parsons,9
6914864,fa06,9
9518890,Matus Dubrava,9
104349,Daniel Roseman,9
2890724,Raydel Miranda,9
4909087,coldspeed

Ahora consultamos las localizaciones mas activas

In [0]:
!gsutil cp -r gs://$BUCKET_NAME/locations_most_actives .
!cat locations_most_actives/*

Copying gs://dataproc-d3c1f40b-c187-47f1-a030-87fd5670b613-europe-north1/locations_most_actives/000000_0...
/ [1 files][ 11.5 KiB/ 11.5 KiB]                                                
Operation completed over 1 objects/11.5 KiB.                                     
None,2851
London,395
New York,289
Nicosia,276
D&#252;sseldorf,230
Bangalore,158
Hyderabad,143
Pune,115
United States,106
Ahmedabad,94
Germany,93
Sydney,92
Dhaka,91
India,89
San Francisco,87
Houston,84
Warsaw,81
United Kingdom,75
Tun&#237;sia,72
Singapore,71
Coimbra,71
Netherlands,70
Arlington,70
France,69
Bratislava,68
Mumbai,65
Chennai,64
Hanoi,62
Austin,62
Bengaluru,61
Melbourne,59
Lahore,58
Abuja,56
Berlin,55
Seattle,51
Granville,50
Islamabad,45
Serbia,43
Kochi,42
UK,41
Israel,40
Slovakia,40
Jaipur,40
MD,40
Los Angeles,38
Australia,37
Cuba,36
Daejeon,35
Virginia,33
Poland,33
Barcelona,32
Paris,31
Cambridge,28
St. Gallen,28
Bordeaux,28
Munich,28
Dublin,28
Italy,27
Ottawa,27
Adelaide SA,27
Vienna,26
England,26
Californ

In [0]:
# Por ultimo, al no tener que usar mas el cluster, lo eliminamos
def delete_cluster(dataproc, project, region, cluster):
  print('Tearing down cluster')
  result = dataproc.projects().regions().clusters().delete(
    projectId=project,
    region=region,
    clusterName=cluster).execute()
  return result

delete_cluster(dataproc, project_id, region, cluster_name)

Tearing down cluster


{'metadata': {'@type': 'type.googleapis.com/google.cloud.dataproc.v1.ClusterOperationMetadata',
  'clusterName': 'cluster-bdarchitecture-stackoverflow',
  'clusterUuid': '364ad0fc-807e-4906-be34-c4accb397daa',
  'description': 'Delete cluster',
  'operationType': 'DELETE',
  'status': {'innerState': 'PENDING',
   'state': 'PENDING',
   'stateStartTime': '2019-01-24T18:19:46.933Z'}},
 'name': 'projects/positive-record-228820/regions/europe-west4/operations/c11f3534-17e7-412a-b4cc-fb8371622aba'}