# Redis Normalized Data Transformation

## Install Necessary Modules

In [None]:
!pip install redis

## Start Redis Stack Container

In [None]:
!docker compose up -d

## Establish Redis Client Connectivity

In [None]:
import csv
import redis
from redis.commands.search.field import NumericField, TagField, TextField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.aggregation import AggregateRequest, Desc
from redis.commands.search.query import Query
from time import perf_counter
import json

client = redis.Redis(
    host='localhost',
    port=6379,
    username='',
    password=''
)
client.flushdb()
client.ping()

## Create Redis Index

In [None]:
idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['brewery:', 'beer:'])
schema = [
    TagField('$.city', as_name="city"),
    TagField('$.state', as_name="state"),
    NumericField('$.beers[*].abv', as_name='abv'),
    NumericField('$.beers[*].ibu', as_name='ibu')
]
client.ft('beer_idx').create_index(schema, definition=idx_def)



## Load Data

In [None]:
pipe = client.pipeline(transaction=False)
num_breweries = 0
with open('./data/breweries.csv', mode='r') as file:
    breweries = csv.DictReader(file)
    for brewery in breweries:
        brewery['beers'] = []
        key = f'brewery:{brewery["brewery_id"]}'
        pipe.json().set(key, '$', brewery)
        num_breweries += 1
pipe.execute()

num_beers = 0
with open('./data/beers.csv', mode='r') as file:
    beers = csv.DictReader(file)
    for beer in beers:
        key = f'brewery:{beer["brewery_id"]}'
        del beer["brewery_id"]
        try:
            beer['abv'] = round(float(beer['abv']),3)
        except ValueError:
            pass
        try:
            beer['ibu'] = round(float(beer['ibu']),1)
        except ValueError:
            pass
        pipe.json().arrappend(key, '$.beers', beer)
        num_beers += 1 
pipe.execute()

print(f'{num_breweries} breweries loaded.')
print(f'{num_beers} beers loaded.')

## Scenario 1
### Find all beers sold for a known brewery ID

In [None]:
brewery_id = 525
t1 = perf_counter()
results = client.json().get(f'brewery:{brewery_id}', '$.beers')
t2 = perf_counter()
print(f'Exec time: {round((t2-t1)*1000,2)} ms')
print(json.dumps(results, indent=2))

## Scenario 2
### Find the names of 4 beers sold in the state of Missouri

In [None]:
state = 'MO'
query = Query(f'(@state:{{{state}}})')\
        .return_field('$.beers[*].name', as_field="beer_name")\
        .paging(0,4)
t1 = perf_counter()
results = client.ft('beer_idx').search(query)
t2 = perf_counter()
print(f'Exec time: {round((t2-t1)*1000,2)} ms')
names = []
for doc in results.docs:
    names.append(doc.beer_name)
print(json.dumps(names, indent=2))

## Scenario 3
### Find the beers sold in Denver with an ABV greater than or equal to 6% and IBU greater than or equal to 60

In [None]:
city = 'Denver'
abv = '0.06'
ibu = '60.0'

request = AggregateRequest(f'@city:{{{city}}} @abv:[{abv} +inf] @ibu:[{ibu} +inf]')\
.load(f'$.beers[?(@.abv>={abv} && @.ibu>={ibu})]')
t1 = perf_counter()
results = client.ft('beer_idx').aggregate(request)
t2 = perf_counter()
print(f'Exec time: {round((t2-t1)*1000,2)} ms')

beers = []
for row in results.rows:
    beers.append(json.loads(row[1]))
print(json.dumps(beers, indent=2))


## Shut Down Redis Stack Container

In [None]:
!docker compose down