## Use case 4 - From S3 To DynamoDB

In this case, we are going to retrieve movie data from a public dataset available at: https://www.kaggle.com/sankha1998/tmdb-top-10000-popular-movies-dataset

The content of the dataset is similar to.:

#### TMDb_updated.CSV

>`
,title,overview,original_language,vote_count,vote_average
0,Ad Astra,"The near future...",en,2853,5.9
1,Bloodshot,"After he ...",en,1349,7.2
2,Bad Boys for Life,"Marcus and Mike ...",en,2530,7.1
`

Once TMDb_updated.csv is downloaded, we will load the information into S3 within the bucket. For this case, instead of loading all the data from the dataset into our NoSQL table, we will put in DynamoDB the title, the average rating, and the overview only if they have received at least 10,000 votes

#### S3Select
To perform this query from Python and automate the ETL process, we will use S3Select to retrieve the subset of data.


<div class=" alert alert-block alert-info">
<h3>S3Select vs AWS Athena</h3>
This type of processing is more convenient to perform using AWS Athena, which does allow joining different datasets. S3Select only allows working with a single table.
</div>

To do this, we will use S3Select to execute the query<br>
> `SELECT s.title, s.overview, s.vote_count, s.vote_average 
 FROM s3object s 
 WHERE cast(s.vote_count as int)> 10000`<br>

and store the result in a new CSV within the same bucket:

In [9]:
import boto3
import os

In [10]:
"""
    The credentials are stored into environment variables:
"""

AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY')

AWS_REGION = os.getenv('AWS_REGION')

BUCKET_NAME = 'my-k-new-bucket'

In [11]:
s3client = boto3.client(
    's3', 
    region_name= AWS_REGION, 
    aws_access_key_id= AWS_ACCESS_KEY, 
    aws_secret_access_key= AWS_SECRET_KEY
)

# 1.- We perform the query using S3Select
resp = s3client.select_object_content(
    Bucket= BUCKET_NAME,
    Key= 'TMDb_updated.csv',
    ExpressionType='SQL',
    Expression="SELECT s.title, s.overview, s.vote_count, s.vote_average \
                FROM s3object s \
                WHERE cast(s.vote_count as int)> 10000",
    InputSerialization={'CSV': {"FileHeaderInfo": "USE",
                                'AllowQuotedRecordDelimiter': True},
                        'CompressionType': 'NONE'},
    OutputSerialization={'CSV': {}},
)


# 2.- We join the data we receive in streaming
registros = ["title,overview,vote_count,vote_average\n"]
for evento in resp['Payload']:
    if 'Records' in evento:
        registros.append(evento['Records']['Payload'].decode())

# 3.- We generate the content as a String
file_str = ''.join(registros)

# 4.- We create a new object in S3
s3client.put_object(Body=file_str, Bucket= BUCKET_NAME,
              Key= 'TMDb_filtered.CSV')

{'ResponseMetadata': {'RequestId': 'SQC9V51JYMBYNZ8K',
  'HostId': 'WtPzxCG/nnzJzUyMDySnXl/gYH9BDCo+btuJbNgcNi7wkkKbp2IPoqiz36p3nVn9xsjtcSQ9bHE=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'WtPzxCG/nnzJzUyMDySnXl/gYH9BDCo+btuJbNgcNi7wkkKbp2IPoqiz36p3nVn9xsjtcSQ9bHE=',
   'x-amz-request-id': 'SQC9V51JYMBYNZ8K',
   'date': 'Fri, 21 Apr 2023 15:08:26 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"0e46f2eef741ecda8647ca3b748b1432"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"0e46f2eef741ecda8647ca3b748b1432"',
 'ServerSideEncryption': 'AES256'}

### From S3 to DynamoDB
Once the file is created in S3, we will load the data into DynamoDB. Since the dataset did not contain the movie release date, we will assign the year 2022 to all movies in our case:

In [14]:
import pandas as pd
from decimal import Decimal

# 1.- We read the file from S3 and put it into a DataFrame

response = s3client.get_object(Bucket= BUCKET_NAME, Key= 'TMDb_filtered.CSV')
movies_df = pd.read_csv(response['Body'], delimiter = ',')

# 2.- We connect to DynamoDB
dynamodb = boto3.resource(
    'dynamodb', 
    region_name= AWS_REGION, 
    aws_access_key_id= AWS_ACCESS_KEY, 
    aws_secret_access_key= AWS_SECRET_KEY
)
table = dynamodb.Table('FilmsData')

# 3.- We insert it into DynamoDB using a batch
with table.batch_writer() as batch:
    for index, fila in movies_df.iterrows():
        Item = {
            'year': 2022,
            'title': str(fila['title']),
            'info': {
                'plot' : fila['overview'],
                'rating' : Decimal(fila['vote_average']).quantize(Decimal('1.00'))
            }
        }
        batch.put_item(Item=Item)