# Proyecto de limpieza de datos con dask

In [2]:
import dask.dataframe as dd
from dask.distributed import Client
import pandas as pd

## Creación del cliente de Dask

In [None]:
client = Client(n_workers = 2, threads_per_worker=1, memory_limit='1GB')
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 2,Total memory: 1.86 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:32839,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 2
Started: Just now,Total memory: 1.86 GiB

0,1
Comm: tcp://127.0.0.1:40559,Total threads: 1
Dashboard: http://127.0.0.1:32975/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:36219,
Local directory: /tmp/dask-scratch-space/worker-p045btb6,Local directory: /tmp/dask-scratch-space/worker-p045btb6

0,1
Comm: tcp://127.0.0.1:38635,Total threads: 1
Dashboard: http://127.0.0.1:40677/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:45963,
Local directory: /tmp/dask-scratch-space/worker-3dqm07v4,Local directory: /tmp/dask-scratch-space/worker-3dqm07v4


## Lectura del archivo

In [4]:
beers = dd.read_csv('data/beer_small.csv', blocksize=20e5)
beers.compute()

Unnamed: 0.1,Unnamed: 0,brewery_id,brewery_name,review_time,review_overall,review_aroma,review_appearance,review_profilename,beer_style,review_palate,review_taste,beer_name,beer_abv,beer_beerid
0,784200,952,Great Dane Pub & Brewing Company (Downtown),1136269921,4.5,4.0,4.0,dirtylou,American IPA,4.0,4.0,Texas Speedbump IPA,,11846
1,1305265,29,Anheuser-Busch,1234830966,4.5,4.0,3.0,talkinghatrack,Light Lager,3.0,4.0,Bud Light Lime,4.2,41821
2,1526298,45,Brooklyn Brewery,1078599557,4.5,4.0,4.0,PopeJonPaul,Scotch Ale / Wee Heavy,4.0,4.5,Brooklyn Heavy Scotch Ale,7.5,16355
3,450647,590,New Glarus Brewing Company,1288790879,4.5,4.5,4.5,sweemzander,American Wild Ale,4.5,4.0,R&D Bourbon Barrel Kriek,5.5,60588
4,1223094,4,Allagash Brewing Company,1295320417,4.5,4.5,4.0,Jmoore50,American Wild Ale,4.0,4.0,Allagash Victor Francenstein,9.7,56665
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
15868,1291002,2378,"Kennebec Brewing Company, Inc.",1181265447,2.5,3.0,4.0,BuckSpin,American Stout,2.5,3.0,Gurglin' Sturgeon Stout,4.0,6071
15869,1388483,292,"Kirin Brewery Company, Limited",1202955854,4.5,3.5,3.5,saztheorybook,Happoshu,3.0,4.0,Sparkling Hop,5.0,40030
15870,1535177,1534,Brouwerij Het Anker,1267764634,4.5,4.0,4.0,SShelly,Belgian Strong Pale Ale,4.5,4.5,Cuvée Van De Keizer Rood (Red),10.0,42538
15871,1164804,10099,Dogfish Head Brewery,1322275887,4.5,4.5,4.0,therica,Russian Imperial Stout,4.5,4.5,Miles Davis' Bitches Brew,9.0,59151


Leemos el csv con Dask, adicionalmente le indicamos el blocksize que determinará el máximo del Dataframe de Dask que será leído

In [5]:
print(beers.dtypes)

Unnamed: 0                      int64
brewery_id                      int64
brewery_name          string[pyarrow]
review_time                     int64
review_overall                float64
review_aroma                  float64
review_appearance             float64
review_profilename    string[pyarrow]
beer_style            string[pyarrow]
review_palate                 float64
review_taste                  float64
beer_name             string[pyarrow]
beer_abv                      float64
beer_beerid                     int64
dtype: object


Este es un análisis preeliminar para saber con que estamos lidiando. Podemos observar que, aunque Dask sea *lazy*, podemos acceder siempre al nombre de las columnas y su tipo.

## Limpieza

Primero decidimos evaluar si las columnas numéricas tienen valores negativos, y si cualquier otra columna tiene valores en NaN o nulos.

In [6]:
def check_negative(column_name):
    negatives = beers[column_name] < 0
    return negatives.sum().compute()

def check_nan(column_name):
    nan = beers[column_name].isna().sum().compute()
    return nan

for col in beers.columns:
    if check_nan(col) != 0:
        print(f"La columna: {col} tiene valores nulos")
    elif beers[col].dtype == 'int64' or beers[col].dtype == 'float64':
        if check_negative(col) != 0:
            print(f"La columna: {col} tiene valores negativos")

La columna: brewery_name tiene valores nulos
La columna: review_profilename tiene valores nulos
La columna: beer_abv tiene valores nulos


In [7]:
print(beers.dtypes)

Unnamed: 0                      int64
brewery_id                      int64
brewery_name          string[pyarrow]
review_time                     int64
review_overall                float64
review_aroma                  float64
review_appearance             float64
review_profilename    string[pyarrow]
beer_style            string[pyarrow]
review_palate                 float64
review_taste                  float64
beer_name             string[pyarrow]
beer_abv                      float64
beer_beerid                     int64
dtype: object


Después, procedemos a ser más eficientes en memoria, aunque Dask nos ayude a paralelizar y hacer más efectivas las operaciones sobre el conjunto, queremos reducir su impacto en memoria. Por lo tano, verificamos si alguna de las columnas puede ser reducida en el número de bytes que ocupan su tipo de dato.

In [8]:
def shorten_int_float(df):
    for col in df.select_dtypes(include=['int64', 'float64']).columns:
        # Como en el analisis the arriba detectamos que no hay columnas con valores negativos entonces procedemos mas facilmente
        dtype = df[col].dtype
        if dtype == 'int64':
            max_val = df[col].max().compute()
            if max_val < 2**8:
                df[col] = df[col].astype('uint8')
            elif max_val < 2**16:
                df[col] = df[col].astype('uint16')
            elif max_val < 2**32:
                df[col] = df[col].astype('uint32')

        elif dtype == 'float64':
            max_val = df[col].max().compute()
            if max_val < 2**16:
                df[col] = df[col].astype('float16')
            elif max_val < 2**32:
                df[col] = df[col].astype('float32')

    return df

beers = shorten_int_float(beers)

Notamos que el reviewtime estaba en un formato desconocido por nosotros hasta el momento, así que decidimos convertirlo a un `datetime` que refleje mejor el tiempo.

In [9]:
def convert_review_time(ddf):

    # Se hace copia de los metadatos del ddf (esto es el esquema de las columnas)
    meta = ddf._meta.copy()

    # Cambiamos el tipo de dato en los metadatos para despues especificar en la lambda function
    meta['review_time'] = pd.to_datetime(meta['review_time'], unit='s')

    # Hacemos una lambda function sobre cada particion del ddf el cual assigna a una columna un tipo de dato diferente pero especificamos los metadatos.
    # Especificamos los metadatos para que a la hora de hacer el cambio de columna los otros tipos de datos no se cambien solos dada la inferencia de tipo de dato de dask.
    return ddf.map_partitions(lambda df: df.assign(review_time=dd.to_datetime(df['review_time'], unit='s')), meta=meta)

beers = convert_review_time(beers)

In [10]:
print(beers.dtypes)

Unnamed: 0                     uint32
brewery_id                     uint16
brewery_name          string[pyarrow]
review_time            datetime64[ns]
review_overall                float16
review_aroma                  float16
review_appearance             float16
review_profilename    string[pyarrow]
beer_style            string[pyarrow]
review_palate                 float16
review_taste                  float16
beer_name             string[pyarrow]
beer_abv                      float16
beer_beerid                    uint32
dtype: object


Notamos que si podíamos ser más eficientes reduciendo el espacio en memoria de enteros y flotantes en caso de que fuera posible. De igual manera, en el output de arriba donde checamos las columnas obtuvimos lo siguiente:
- La columna: brewery_name tiene valores nulos
- La columna: review_profilename tiene valores nulos
- La columna: beer_abv tiene valores nulos

Decidimos seguir la estrategia de imputación de "UNKNOWN" para las columnas de texto, ya que no podemos inferir la información de otra forma. Decidimos llenar los nulos numéricos de la columna `beer_abv` con la media, ya que se trata de un valor del volumen de alcohol, por lo que, en promedio son datos cercanos. Además, en consultas no perdemos ese valor representativo medio.

In [None]:
beers['brewery_name'] = beers['brewery_name'].fillna('UNKNOWN')
beers['review_profilename'] = beers['review_profilename'].fillna('UNKNOWN')
beers['beer_abv'] = beers['beer_abv'].fillna(beers['beer_abv'].mean().compute())
beers = beers.drop('Unnamed: 0', axis=1)

print(beers.isna().sum().compute())

brewery_id            0
brewery_name          0
review_time           0
review_overall        0
review_aroma          0
review_appearance     0
review_profilename    0
beer_style            0
review_palate         0
review_taste          0
beer_name             0
beer_abv              0
beer_beerid           0
dtype: int64


## Análisis estadístico utilizando Dask

Queremos realizar un análisis general sobre el Dataframe y aprovechar el procesamiento paralelo de Dask. Primero queremos hacer un resumen estadístico por cerveza y obtener aquella(s) que tiene(n) la mejor calificación general.

In [12]:
# Revisamos que hayan ids repetidos para poder agrupar
len(beers['beer_beerid'].unique())



23967

In [22]:
columnas_para_media = ['review_overall', 'review_aroma', 'review_appearance', 'review_palate', 'review_taste']
beers_mean = beers.groupby('beer_beerid')[columnas_para_media].mean()
beers_mean.compute()

Unnamed: 0_level_0,review_overall,review_aroma,review_appearance,review_palate,review_taste
beer_beerid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
3,4.000000,3.750000,3.750000,4.250000,4.000000
5,3.322222,3.111111,3.400000,3.277778,3.222222
6,3.766234,3.564935,3.935065,3.577922,3.675325
7,3.344262,3.254098,3.245902,2.983607,3.114754
8,3.714286,2.928571,3.142857,3.571429,3.071429
...,...,...,...,...,...
76671,4.500000,4.000000,4.000000,4.500000,4.500000
76927,4.000000,3.500000,4.000000,3.500000,4.000000
76950,3.500000,3.500000,4.000000,3.500000,3.500000
76989,4.500000,5.000000,4.500000,4.500000,4.500000


In [24]:
max_val = beers_mean['review_overall'].max()
fila_max = beers_mean[beers_mean['review_overall'] == max_val]
fila_max.compute()

Unnamed: 0_level_0,review_overall,review_aroma,review_appearance,review_palate,review_taste
beer_beerid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
3378,5.0,3.0,4.0,4.0,4.5
5650,5.0,5.0,5.0,5.0,5.0
7794,5.0,4.5,4.0,4.0,4.5
8627,5.0,5.0,4.5,5.0,5.0
8773,5.0,4.5,4.0,4.0,5.0
...,...,...,...,...,...
52845,5.0,4.0,3.5,4.5,4.0
53025,5.0,5.0,5.0,5.0,5.0
56603,5.0,4.0,4.5,5.0,4.5
59530,5.0,4.0,4.0,4.5,4.5


Repetimos el mismo análisis para la 'brewery'.

In [15]:
len(beers['brewery_id'].unique())

3832

In [25]:
columnas_para_media = ['review_overall', 'review_aroma', 'review_appearance', 'review_palate', 'review_taste']
breweries_mean = beers.groupby('brewery_id')[columnas_para_media].mean()
breweries_mean.compute()

Unnamed: 0_level_0,review_overall,review_aroma,review_appearance,review_palate,review_taste
brewery_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
1,3.845865,3.454887,3.695489,3.601504,3.669173
2,3.187500,3.187500,3.125000,3.187500,3.062500
3,3.575490,3.425490,3.616667,3.403922,3.434314
4,4.081006,4.032123,4.053771,4.022346,4.101955
5,3.657895,3.578947,3.605263,3.414474,3.552632
...,...,...,...,...,...
26943,2.000000,3.000000,3.500000,1.500000,1.500000
27038,3.000000,3.500000,4.000000,3.500000,3.500000
27329,4.000000,4.000000,4.000000,4.000000,4.000000
27742,3.500000,3.000000,3.500000,4.000000,3.500000


In [26]:
val_max = breweries_mean['review_overall'].max()
fila_max = breweries_mean[breweries_mean['review_overall'] == val_max]
fila_max.compute()

Unnamed: 0_level_0,review_overall,review_aroma,review_appearance,review_palate,review_taste
brewery_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
5961,5.0,4.5,4.5,4.5,5.0
8593,5.0,4.0,4.5,4.0,5.0
3960,5.0,3.0,4.0,3.5,4.0
5047,5.0,5.0,5.0,5.0,4.0
12706,5.0,4.5,4.5,4.5,5.0
21109,5.0,4.0,4.0,4.0,4.0
3048,5.0,3.5,4.0,4.0,3.5
15457,5.0,4.5,4.5,5.0,5.0
5499,5.0,3.5,4.0,3.5,3.5
9050,5.0,4.5,4.5,5.0,5.0


Después, quisimos averiguar cuál es el 'estilo' de cerveza que más veces fue reseñado.

In [34]:
beer_style_counts = beers.groupby('beer_style')['beer_style'].count().compute().reset_index(name='count')
beer_style_counts = beer_style_counts.sort_values(by='count', ascending=False)
beer_style_counts

Unnamed: 0,beer_style,count
12,American IPA,11807
9,American Double / Imperial IPA,8566
14,American Pale Ale (APA),6221
89,Russian Imperial Stout,5414
11,American Double / Imperial Stout,5074
...,...,...
56,Faro,64
48,English Pale Mild Ale,58
88,Roggenbier,40
64,Happoshu,30


Esta consulta obtiene cuál es el perfil que más reseñas dejó sobre cervezas. Una vez obtenido el resultado, y dado que eran demasiadas reseñas, quisimos saber cuanto tiempo tardó en reseñar todas las cervezas (aquí fue útil la conversión de `review_time` a datetime).

In [36]:
reviewers = beers.groupby('review_profilename')['review_profilename'].count().compute().reset_index(name='count')
reviewers = reviewers.sort_values(by='count', ascending=False)
reviewers

Unnamed: 0,review_profilename,count
4347,northyorksammy,547
4148,mikesgroove,484
376,BuckeyeNation,450
2057,Thorpe429,381
439,ChainGangGuy,362
...,...,...
14909,waltaburge,1
14910,wbrown,1
14911,wertperch,1
14912,wfuqua,1


In [40]:
from datetime import timedelta
time_spent = beers[beers['review_profilename'] == 'northyorksammy']
time_spent = time_spent[['review_profilename', 'review_time']]
max_time = time_spent['review_time'].max().compute()
min_time = time_spent['review_time'].min().compute()

time_sammy_spent = max_time - min_time
time_sammy_spent = time_sammy_spent.total_seconds()/(365.25 * 24 * 3600)
time_sammy_spent

8.112954407179252

Finalmente quisimos hacer un agregado por cervecería y estilo de cerveza en el que, no aprovechando tanto Dask por el abuso de `compute()`, podemos hacer un uso muy similar que con pandas. Se obtiene cuál de este par de columnas tiene más reseñas, promedio y mayor cantidad de alcohol promedio por bebida.
De esta forma obtenemos una medida de confianza (número de reseñas), calidad (reseña general promedio) y cantidad de alcohol.

In [47]:
review_counts = beers.groupby(['brewery_name', 'beer_style']).size().compute().reset_index(name='review_count')
review_counts = review_counts.sort_values(by='review_count', ascending=False)

review_avg = beers.groupby(['brewery_name', 'beer_style'])['review_overall'].mean().compute().reset_index(name='review_overall_avg')
abv_avg = beers.groupby(['brewery_name', 'beer_style'])['beer_abv'].mean().compute().reset_index(name='beer_abv_avg')

result = review_counts.merge(review_avg, on=['brewery_name', 'beer_style'])
result = result.merge(abv_avg, on=['brewery_name', 'beer_style'])

result

Unnamed: 0,brewery_name,beer_style,review_count,review_overall_avg,beer_abv_avg
0,Sierra Nevada Brewing Co.,American IPA,964,4.141598,6.751483
1,Stone Brewing Co.,American Strong Ale,814,4.056511,8.291476
2,Dogfish Head Brewery,American Double / Imperial IPA,746,3.946381,10.898794
3,Founders Brewing Company,American Double / Imperial Stout,503,4.375746,9.711015
4,Unibroue,Belgian Strong Dark Ale,489,4.146217,8.964213
...,...,...,...,...,...
17904,Harbor City Brewing Company,English Brown Ale,1,3.500000,-1.000000
17905,Hangar 24 Brewery,Irish Dry Stout,1,4.500000,4.800781
17906,Ankerbräu Nördlingen,German Pilsener,1,3.500000,4.699219
17907,Angry Cedar Brewing Company,American Amber / Red Ale,1,1.500000,4.500000


In [48]:
beers.compute()

  has_large_values = (abs_vals > 1e6).any()


Unnamed: 0,brewery_id,brewery_name,review_time,review_overall,review_aroma,review_appearance,review_profilename,beer_style,review_palate,review_taste,beer_name,beer_abv,beer_beerid
0,952,Great Dane Pub & Brewing Company (Downtown),2006-01-03 06:32:01,4.5,4.0,4.0,dirtylou,American IPA,4.0,4.0,Texas Speedbump IPA,-1.000000,11846
1,29,Anheuser-Busch,2009-02-17 00:36:06,4.5,4.0,3.0,talkinghatrack,Light Lager,3.0,4.0,Bud Light Lime,4.199219,41821
2,45,Brooklyn Brewery,2004-03-06 18:59:17,4.5,4.0,4.0,PopeJonPaul,Scotch Ale / Wee Heavy,4.0,4.5,Brooklyn Heavy Scotch Ale,7.500000,16355
3,590,New Glarus Brewing Company,2010-11-03 13:27:59,4.5,4.5,4.5,sweemzander,American Wild Ale,4.5,4.0,R&D Bourbon Barrel Kriek,5.500000,60588
4,4,Allagash Brewing Company,2011-01-18 03:13:37,4.5,4.5,4.0,Jmoore50,American Wild Ale,4.0,4.0,Allagash Victor Francenstein,9.703125,56665
...,...,...,...,...,...,...,...,...,...,...,...,...,...
15868,2378,"Kennebec Brewing Company, Inc.",2007-06-08 01:17:27,2.5,3.0,4.0,BuckSpin,American Stout,2.5,3.0,Gurglin' Sturgeon Stout,4.000000,6071
15869,292,"Kirin Brewery Company, Limited",2008-02-14 02:24:14,4.5,3.5,3.5,saztheorybook,Happoshu,3.0,4.0,Sparkling Hop,5.000000,40030
15870,1534,Brouwerij Het Anker,2010-03-05 04:50:34,4.5,4.0,4.0,SShelly,Belgian Strong Pale Ale,4.5,4.5,Cuvée Van De Keizer Rood (Red),10.000000,42538
15871,10099,Dogfish Head Brewery,2011-11-26 02:51:27,4.5,4.5,4.0,therica,Russian Imperial Stout,4.5,4.5,Miles Davis' Bitches Brew,9.000000,59151


# Limpieza de datos usando unicamente pandas

In [None]:
beers = pd.read_csv('data/beer_small.csv')
beers

Unnamed: 0.1,Unnamed: 0,brewery_id,brewery_name,review_time,review_overall,review_aroma,review_appearance,review_profilename,beer_style,review_palate,review_taste,beer_name,beer_abv,beer_beerid
0,784200,952,Great Dane Pub & Brewing Company (Downtown),1136269921,4.5,4.0,4.0,dirtylou,American IPA,4.0,4.0,Texas Speedbump IPA,,11846
1,1305265,29,Anheuser-Busch,1234830966,4.5,4.0,3.0,talkinghatrack,Light Lager,3.0,4.0,Bud Light Lime,4.2,41821
2,1526298,45,Brooklyn Brewery,1078599557,4.5,4.0,4.0,PopeJonPaul,Scotch Ale / Wee Heavy,4.0,4.5,Brooklyn Heavy Scotch Ale,7.5,16355
3,450647,590,New Glarus Brewing Company,1288790879,4.5,4.5,4.5,sweemzander,American Wild Ale,4.5,4.0,R&D Bourbon Barrel Kriek,5.5,60588
4,1223094,4,Allagash Brewing Company,1295320417,4.5,4.5,4.0,Jmoore50,American Wild Ale,4.0,4.0,Allagash Victor Francenstein,9.7,56665
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
158656,1291002,2378,"Kennebec Brewing Company, Inc.",1181265447,2.5,3.0,4.0,BuckSpin,American Stout,2.5,3.0,Gurglin' Sturgeon Stout,4.0,6071
158657,1388483,292,"Kirin Brewery Company, Limited",1202955854,4.5,3.5,3.5,saztheorybook,Happoshu,3.0,4.0,Sparkling Hop,5.0,40030
158658,1535177,1534,Brouwerij Het Anker,1267764634,4.5,4.0,4.0,SShelly,Belgian Strong Pale Ale,4.5,4.5,Cuvée Van De Keizer Rood (Red),10.0,42538
158659,1164804,10099,Dogfish Head Brewery,1322275887,4.5,4.5,4.0,therica,Russian Imperial Stout,4.5,4.5,Miles Davis' Bitches Brew,9.0,59151


In [None]:
def shorten_int_float(df):
    for col in df.select_dtypes(include=['int64', 'float64']).columns:
        # Como en el analisis the arriba detectamos que no hay columnas con valores negativos entonces procedemos mas facilmente
        dtype = df[col].dtype
        if dtype == 'int64':
            max_val = df[col].max()
            if max_val < 2**8:
                df[col] = df[col].astype('uint8')
            elif max_val < 2**16:
                df[col] = df[col].astype('uint16')
            elif max_val < 2**32:
                df[col] = df[col].astype('uint32')

        elif dtype == 'float64':
            max_val = df[col].max()
            if max_val < 2**16:
                df[col] = df[col].astype('float16')
            elif max_val < 2**32:
                df[col] = df[col].astype('float32')

    return df

beers = shorten_int_float(beers)

In [None]:
beers['review_time'] = pd.to_datetime(beers['review_time'], unit='s')
beers['brewery_name'] = beers['brewery_name'].fillna('UNKOWN')
beers['review_profilename'] = beers['review_profilename'].fillna('UNKOWN')
beers['beer_abv'] = beers['beer_abv'].fillna(beers['beer_abv'].mean())
beers = beers.drop('Unnamed: 0', axis=1)
print(beers.isna().sum())
beers.dtypes

brewery_id            0
brewery_name          0
review_time           0
review_overall        0
review_aroma          0
review_appearance     0
review_profilename    0
beer_style            0
review_palate         0
review_taste          0
beer_name             0
beer_abv              0
beer_beerid           0
dtype: int64


brewery_id                    uint16
brewery_name                  object
review_time           datetime64[ns]
review_overall               float16
review_aroma                 float16
review_appearance            float16
review_profilename            object
beer_style                    object
review_palate                float16
review_taste                 float16
beer_name                     object
beer_abv                     float16
beer_beerid                   uint32
dtype: object

Notar que se sigue el mismo proceso que en Dask en la mayoría de las consultas (veremos cuando no en el cuaderno 03). Solamente, en este caso, estamos perdiendo cómputo paralelo que ganábamos con Dask y la oportunidad de hacer las operaciones *lazy* y ejecutarlas con el `collect()`.