![](resources/title.png)

# Tarea en Python usando PySpark para la asignatura de PAB

### El objetivo de la práctica es analizar los datos de indicadores de enfermedades crónicas que se proporcionan en el portal dado, de dónde obtenemos un fichero .csv que contiene los datos a analizar. 

### Usaremos la API de RDDs y la de datasets

### Autores:
* Miguel González 
* Samuel Delgado
* Victoria Cruz

## PyData.py con API RDDs

In [1]:
from pyspark import SparkConf, SparkContext
import sys
import csv
import time


file_name = "resources/U.S._Chronic_Disease_Indicators__CDI_.csv"

sc = SparkContext.getOrCreate()

logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)

start_computing_time = time.time()

file = sc \
    .textFile(file_name)
header = file.first()

d = file.filter(lambda line: line != header) \
    .map(lambda line: csv.reader([line], quotechar='"', delimiter=',',
                                 quoting=csv.QUOTE_ALL, skipinitialspace=True).__next__()) \
    .map(lambda r: (r[5], r[6])) \
    .groupByKey() \
    .map(lambda x: (x[0], set(x[1]))) \
    .mapValues(len) \
    .collect()

for pair in d:
 print(pair)

total_computing_time = time.time() - start_computing_time
print("Computing time: ", str(total_computing_time))

('Asthma', 9)
('Chronic Kidney Disease', 4)
('Older Adults', 4)
('Mental Health', 3)
('Immunization', 1)
('Disability', 1)
('Cancer', 20)
('Oral Health', 9)
('Tobacco', 16)
('Reproductive Health', 3)
('Arthritis', 10)
('Diabetes', 20)
('Cardiovascular Disease', 18)
('Overarching Conditions', 16)
('Alcohol', 16)
('Chronic Obstructive Pulmonary Disease', 16)
('Nutrition, Physical Activity, and Weight Status', 37)
Computing time:  9.13486099243164


## PyDataFrame.py con API datasets

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as func

import sys
import time


file_name = "resources/U.S._Chronic_Disease_Indicators__CDI_.csv"

spark_session = SparkSession \
    .builder \
    .getOrCreate()

logger = spark_session._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)

start_computing_time = time.time()

data_frame = spark_session \
    .read \
    .format("csv") \
    .options(header='true', inferschema='true') \
    .load(file_name)

data_frame \
    .groupBy("Topic") \
    .agg(func.countDistinct("Question")) \
    .sort("Topic") \
    .show()

data_frame \
    .groupBy("Topic", "LocationDesc") \
    .count() \
    .sort("LocationDesc") \
    .show()

total_computing_time = time.time() - start_computing_time
print("Computing time: ", str(total_computing_time))

+--------------------+------------------------+
|               Topic|count(DISTINCT Question)|
+--------------------+------------------------+
|             Alcohol|                      16|
|           Arthritis|                      10|
|              Asthma|                       9|
|              Cancer|                      20|
|Cardiovascular Di...|                      18|
|Chronic Kidney Di...|                       4|
|Chronic Obstructi...|                      16|
|            Diabetes|                      20|
|          Disability|                       1|
|        Immunization|                       1|
|       Mental Health|                       3|
|Nutrition, Physic...|                      37|
|        Older Adults|                       4|
|         Oral Health|                       9|
|Overarching Condi...|                      16|
| Reproductive Health|                       3|
|             Tobacco|                      16|
+--------------------+------------------