In [None]:
! pip install pyspark boto3
! pip install -q awscli
! aws configure

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import boto3
from boto3.dynamodb.conditions import Key
import json

spark = SparkSession.builder.appName("PySpark").getOrCreate()

file_path = "/content/dados_importar.csv"

df = spark.read.option("header", True).option("delimiter", ";").csv(file_path)

In [None]:
df = df.drop("Tipo da Tarefa ID").drop("Data de Conclusão").drop("Status").drop("Usuário")

In [None]:
df = df.withColumnRenamed('Nome da Tarefa', 'name').withColumnRenamed('Tipo da Tarefa', 'type_task').withColumnRenamed('Data de Criação', 'date').withColumnRenamed('Status Descrição', 'status').withColumnRenamed('ID do Usuário', 'PK')

In [None]:
df = df.filter(col("status") != "Cancelado").filter(col("Usuário") != "Pedro da Silva")

In [None]:
df = df.withColumn('status', regexp_replace('status', 'Concluído', 'done')).withColumn('status', regexp_replace('status', 'A Fazer', 'todo'))
df = df.withColumn("date", to_date("date"))

In [None]:
# gerar item_id para SK com função nativa do pyspark
df = df.withColumn("item_id", sha2(concat_ws("-", "date", "name", "status"), 224))

In [None]:
# estruturando PK e SK, trocando user_id
df = df.withColumn("PK", concat_ws("", lit("USER#"), lit("b4c894d8-3091-70b2-79f6-78ebfd1b527f")))
df = df.withColumn("SK", concat_ws("", lit("LIST#"), df["date"], lit("ITEM#"), df["item_id"]))
df.show(truncate=False)
print(df.count())

In [None]:
data_json = df.toJSON().collect()

In [None]:
import time
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('MARKET_LIST')

with table.batch_writer() as batch:
  for i, row in enumerate(data_json):
    item = json.loads(row)
    try:
      batch.put_item(Item=item)
      if i % 100 == 0:
        time.sleep(1)
    except Exception as e:
      print("Erro ao inserir item:", item)
      print("→", e)
print(f"Sucesso! {len(data_json)} itens processados.")