<a href="https://colab.research.google.com/github/joseluisgarciad/Arbol/blob/master/Copia_de_proyecto.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 1. Memoria del proyecto

### **1. Introducción**

Objetivo del proyecto:

Nuestro cliente es propietario de una plataforma educativa online, con presencia internacional. Entre su oferta, encontramos contenido especializado en el área de tecnología: programación, Big Data, desarrollo web, Apps móviles, etc. 

El servicio que llevaremos a cabo consistirá en orientarles acerca de cuáles de estos productos son los más demandados. Para ello nos basaremos en datos de registros y tendencias de uso, que pondrán énfasis en la información obtenida “en tiempo real”. De esta manera, nuestro cliente podrá tomar las decisiones más adecuadas de cara a su oferta educativa del próximo año. La optimización de sus recursos estará respaldada por nuestra tecnología y experiencia en Big Data.


###**2**. Diseño de arquitectura del proyecto



Como respuesta a las demandas de nuestro cliente, hemos desarrollado una arquitectura concreta, en la que distintos sistemas y herramientas nos llevarán a alcanzar la solución más adecuada, que nos muestre datos con significado para su compañía y sus siguientes objetivos estratégicos.

Hemos decidido dividir la estrategia en dos itinerarios. En primer lugar, para conocer qué productos son más demandados por sus potenciales clientes, obtuvimos registros de actividad en los foros más utilizados por programadores a nivel mundial (Stackoverflow.com). 
También se necesita saber de que lenguajes/tecnologias se habla en Twitter, en concreto se necesita saber en que paises y en que idiomas para realizar una estadistica.
A continuación hacemos un esquema de las arquitectura de ambos procesos.



**2.1 BASES DE DATOS DE ORIGEN STACKLITE**

- Realizamos la ingesta de datos mediante Sqoop, a través de Hive, llevando las Bases de datos originales a nuestro servidor Hdfs.






In [None]:
sqoop import --connect jdbc:mysql://10.0.4.201:3306/stacklite --username training --password training --table questions --hive-import

* Trabajamos estos datos a través de Spark en Python. Para ello, ejecutamos el siguiente archivo de configuración:

In [None]:
from pyspark.sql import SparkSession
spark.stop()
spark = (SparkSession
         .builder
         .appName("Python Spark SQL Hive")
         .config("spark.sql.warehouse.dir", "user/hive/warehouse")
         .config("hive.metastore.uris", "thrift://ip-10-0-4-11.eu-west-1.compute.internal:9083")
         .enableHiveSupport()
         .getOrCreate()
         )

Transformamos las bases de datos, unificándolas por el criterio común de id. Para las consultas, utilizamos la extensión de SparkSQL.

In [None]:
df_q = spark.sql(""" SELECT * FROM project_questions """)
df_q.printSchema()

df_qt = spark.sql(""" SELECT * FROM project_question_tags """)
df_qt.printSchema()

df_qt.registerTempTable('qt1')
sql_qt1=spark.sql('select * from qt1')
sql_qt1.show()



Con el siguiente comando, vemos el conteo de, en general, cuales son los lenguajes más demandados por los usuarios de los foros de esta plataforma:

In [None]:
df_qt_tmp=spark.sql('select qt1.tag, count(*) as contador from qt1 group by qt1.tag order by contador desc limit 20')
df_qt_tmp.show()

Sin embargo, creemos que un criterio más relevante para nuestro cliente sería obtener datos más actuales. Ordenaremos nuestros resultados por fecha, de más actual a más antiguo.

In [None]:
df_q.registerTempTable('q1')

df_fecha = spark.sql('select * from qt1, q1 where q1.usuario = qt1.id limit 20')
df_fecha.show()

df_fecha1 = spark.sql('select id,tag,creationdate from qt1, q1 where q1.usuario = qt1.id limit 20')
df_fecha1.show()


from pyspark.sql.functions import col, unix_timestamp, to_date
df_q=df_q.withColumn('fecha_a', to_date(unix_timestamp(col('creationdate'), 'yyyy-MM-dd').cast('timestamp')))
0df_q.show()

**2.2 Origen TWITTER** 

Mediante flume hacemos una ingesta en tiempo real de los datos de twitter de una lista de lenguajes/tecnologias procedentes de StackOverflow.
Para manejar una cantidad representativa de datos hacemos una extración de 22.000 tweets.
Nos encontramos con la circunstancia de que los usuarios en Twitter, al menos gran parte de ellos, no indican su país de residencia, lo que impide que el muestreo sea todo lo preciso que deberia ser.

Necesitamos saber el Lenguaje y la procedencia de los Tweets extraidos.

Usamos la herramienta Apache Flume, para ello hemos creado un fichero de configuración "twitter.conf" con los parametros necesarios para realizar la ingesta, entre esos datos indicamos como keywords los lenguajes/tecnologias, claves de acceso a Twitter, configuracion del sumidero, etc...

Lanzamos el comando:



In [None]:
flume-ng agent --conf /etc/flume-ng/conf --conf-file /home/jlgarcia/flume/tweets/twitter.conf --name TwitterAgent --plugins-path /usr/lib/flume-ng/plugins.d:/var/lib/flume-ng/plugins.d -Dflume.root.logger=INFO,console

Despues de 15 minutos de proceso, cancelamos para tener una cantidad representativa de Tweets, como no hay forma de saber exactamente cuantos registros lleva extraidos se hace a ojo.

Una vez que tenemos los ficheros flume en Hdfs procedemos a crear una sesión es PySpark para hacer el tratamiento de los datos.

In [None]:
from pyspark.sql import SparkSession
spark = (SparkSession
         .builder
         .appName("Python Spark SQL basic example")
         .config("spark.some.config.option", "some-value")
         .getOrCreate()
         )


Creamos un DataFrame en formato Json con los datos de los ficheros flume

*   Elemento de lista
*   Elemento de lista



In [None]:
filename = '*'
df = spark.read.format('json').option('head',True).load('hdfs:///user/jlgarcia/flume/tweets/' + filename)

Creamos una tabla temporal para poder acceder a los datos json convertidos mediante sintaxis SQL

In [None]:
df.registerTempTable('tweets')


Analizamos la estructura del archivo para localizar los campos que debemos usar para hacer las consultas

---



In [None]:
dftweetCompleto = spark.sql("SELECT * FROM tweets")
dftweetCompleto.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)

Empezamos con Idioma, por lo tanto usamos : lang

> Bloque con sangría



In [None]:
Tmp = spark.sql("SELECT lang, count(lang) as contador FROM tweets where lang <> 'und' GROUP BY lang ORDER BY contador DESC")
Tmp.show(100)

+----+--------+
|lang|contador|
+----+--------+
|  ja|    8253|
|  en|    5516|
|  es|    2009|
|  fr|     691|
|  ko|     483|
|  ru|     426|
|  de|     374|
|  ar|     357|
|  tr|     318|
|  th|     256|
|  pt|     250|
|  it|     232|

El segundo dato que necesitamos es el pais, por lo tanto usamos place.country y place.country_code

In [None]:
Tmp3 = spark.sql("SELECT place.country, place.country_code, count(*) as contador FROM tweets group by place.country, place.country_code")
Tmp3.show(10000)

+--------------------+------------+--------+
|             country|country_code|contador|
+--------------------+------------+--------+
|           Indonesia|          ID|       3|
|Emiratos Arabes     |          AE|       2|
|           Australia|          AU|       3|
|            Pakistan|          PK|       1|
|              Polska|          PL|       1|
|            Colombia|          CO|       1|
|            Thailand|          TH|       1|
|                null|        null|   22170|
|            Malaysia|          MY|       1|
|         Switzerland|          CH|       1|
|              México|          MX|       1|
|              España|          ES|      12|

Una vez que tenemos los resultados de las consultas tenemos que llevarlos a csv para poder importarlos desde "Tableau" para poder hacer los graficos.
Lo hacemos tanto para los idiomas como para los paises

In [None]:
Tmp.write.format("csv").save('hdfs:///user/jlgarcia/flume/tweets/TweetLang.csv')
hdfs dfs -cat /user/jlgarcia/flume/tweets/TweetLang.csv/* > resultado_lang.csv
Tmp3.write.format("csv").save('hdfs:///user/jlgarcia/flume/tweets/TweetCountry.csv')
hdfs dfs -cat /user/jlgarcia/flume/tweets/TweetCountry2.csv/* > resultado_country.csv

Importamos los ficheros "resultado_lang.csv" y "resultado_country.csv" en Tableau y hacemos los graficos

### 3. Resultados

•	Lenguajes y tecnologías más populares

Como vimos anteriormente, el conteo general de las tecnologías y herramientas más demandadas sería el siguiente:


Y con el criterio de ordenación por fecha:

•	Popularidad en función del país

•	Idioma de quien utiliza las herramientas

* Gráfico con 10 tecnologías más populares y su tendencia desde 2008 hasta 2017

##Ingesta de datos con sqoop

sqoop import-all-tables --connect jdbc:mysql://10.0.4.201:5432/stacklite \--username training \--password training \--target-dir /user/jlgarcia/grupob --verbose

sqoop import --connect jdbc:mysql://10.0.4.201:3306/stacklite --username training --password training --table question_tags --delete-target-dir --target-dir "/user/jlgarcia/grupob" --verbose

## importar con sqoop a través de Hive

sqoop import --connect jdbc:mysql://10.0.4.201:3306/stacklite --username training --password training --table question_tags --hive-import ((((con mi usuario -p))))

sqoop import --connect jdbc:mysql://10.0.4.201:3306/stacklite --username training --password training --table questions --hive-import

Archivo de configuración del agente flume Twitter

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

#TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = wWfHmif7iRq1h4GtzdhOUhHMc
TwitterAgent.sources.Twitter.consumerSecret = Vg2bKZkq3uxRqNihlEpZxzixwjvClxDzIshshO6k5K5du0gMvN
TwitterAgent.sources.Twitter.accessToken = 289473525-vurwFr068lcdKWQFodMcsqQAzLMCeq0w6ndMcvq6
TwitterAgent.sources.Twitter.accessTokenSecret = pHBak5EmBvW6iLPYRiSwKAO0b2ztgojC5ou4QoVvv0Lnt
TwitterAgent.sources.Twitter.keywords = python
#c#, c++, c, java, javascript, perl, ada, lisp, vb.net, pascal, cobol, php, pl/sql, ruby, fortran, algol, erlang

TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = /user/jlgarcia/flume/tweets
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 1000
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 10000

TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel


Archivo de configuración del agente flume Twitter

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

#TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = wWfHmif7iRq1h4GtzdhOUhHMc
TwitterAgent.sources.Twitter.consumerSecret = Vg2bKZkq3uxRqNihlEpZxzixwjvClxDzIshshO6k5K5du0gMvN
TwitterAgent.sources.Twitter.accessToken = 289473525-vurwFr068lcdKWQFodMcsqQAzLMCeq0w6ndMcvq6
TwitterAgent.sources.Twitter.accessTokenSecret = pHBak5EmBvW6iLPYRiSwKAO0b2ztgojC5ou4QoVvv0Lnt
TwitterAgent.sources.Twitter.keywords = python
#c#, c++, c, java, javascript, perl, ada, lisp, vb.net, pascal, cobol, php, pl/sql, ruby, fortran, algol, erlang

TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = /user/jlgarcia/flume/tweets
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 1000
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 10000

TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel




Comando para lanzar agente flume que carga Twitter

flume-ng agent -n TwitterAgent -c conf -f /home/jlgarcia/flume/tweets/twitter.conf -Dflume.root.logger=INFO, console

https://www.toptal.com/apache/tutorial-apache-spark-streaming-identificando-los-hashtags-de-tendencia-de-twitter/es

## **mysql esquema de acciones**
(https://drive.google.com/open?id=0B0p-gxIq9qEuQUFMSjY3Q3NVcElLLTI2VW5ERzFUck5RUnlF)


https://archive.org/details/stackexchange

SELECT * FROM project_questions, project_question_tags
WHERE project_questions.usuario = project_question_tags.id;