In [11]:
import json
from collections import defaultdict
from minio import Minio
import boto3
import pandas as pd
import pyarrow as pa
import numpy as np
from mypy_boto3_s3.client import S3Client
from mypy_boto3_s3.paginator import ListObjectsV2Paginator
from mypy_boto3_s3.service_resource import S3ServiceResource
from mypy_boto3_s3.service_resource import ServiceResourceBucketsCollection
from mypy_boto3_s3.service_resource import Bucket
from mypy_boto3_s3.service_resource import (
    BucketObjectsCollection,
    Object,
    ObjectSummary,
)
from hashlib import sha256
import tqdm
import base64
from lxml import etree
import re
from lxml.etree import Element
from smart_open import s3 as smart_open_s3
from typing import List, Dict, Any

from src.schemas import MMS, SMS, Call, CorrespondenceBase

from src.utils import replace_null_with_none

In [12]:
BUCKET_NAME = "sms-backup-restore"

In [None]:
with open("credentials.json", "r") as fp:
    creds = json.load(fp)
    access_key = creds["accessKey"]
    secret_key = creds["secretKey"]

boto3_session = boto3.Session(
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key,
)
s3_client = boto3_session.client(
    "s3",
    endpoint_url="http://192.168.103.69:9000",
    verify=False,
)
s3_resource: S3ServiceResource = boto3_session.resource(
    "s3",
    endpoint_url="http://192.168.103.69:9000",
    verify=False,
)

dynamodb = boto3.resource(
    "dynamodb",
    endpoint_url="http://192.168.103.69:8000",
    verify=False,
)


if BUCKET_NAME not in [b["Name"] for b in s3_client.list_buckets().get("Buckets")]:
    s3_client.create_bucket(Bucket=BUCKET_NAME)
bucket: Bucket = s3_resource.Bucket(BUCKET_NAME)

bucket_objs: BucketObjectsCollection = list(bucket.objects.all())
backups = [
    obj for obj in bucket.objects.all() if re.search(r"^[\w\d_-]+\.xml", obj.key)
]
backups

In [14]:
def process_tag(elem: Element) -> CorrespondenceBase:
    e_data = replace_null_with_none(dict(elem.attrib))

    match elem.tag:
        case "call":
            return Call.model_validate(e_data)
        case "sms":
            return SMS.model_validate(e_data)
        case "mms":
            parts = [
                replace_null_with_none(dict(part.attrib))
                for part in elem.findall(".//part")
            ]
            parts1 = []
            for part in parts:
                if part["ct"] not in ["application/smil", "text/plain"] and bool(
                    part["data"]
                ):
                    data = base64.b64decode(part["data"])
                    data_sha256 = sha256(data).hexdigest()
                    key = f"parts/{data_sha256}"
                    try:
                        s3_client.head_object(Bucket=BUCKET_NAME, Key=key)
                    except Exception:
                        bucket.put_object(Body=data, Key=key, ContentType=part["ct"])

                    part["data"] = data_sha256
                    assert part["data"] == data_sha256
                parts1.append(part)

            addrs = [
                replace_null_with_none(dict(addr.attrib))
                for addr in elem.findall(".//addr")
            ]
            e_data.update({"parts": parts1, "addrs": addrs})

            return MMS.model_validate(e_data)
        case _:
            pass

In [15]:
def process_backup(
    s3_client: S3Client, bucket_name: str, backup_key: str
) -> List[Dict[str, Any]]:
    fin: smart_open_s3.Reader = smart_open_s3.open(
        bucket_name,
        backup_key,
        mode="rb",
        defer_seek=True,
        client=s3_client,
    )
    seekable_reader = fin._raw_reader

    # Creates document parser with the buffered file
    context = etree.iterparse(seekable_reader, recover=True, encoding="utf-8")

    # Skips to the children of interest
    context = iter(context)
    next(context)

    tag_data = {}
    for event, elem in context:
        tag_parsed = process_tag(elem=elem)
        if isinstance(tag_parsed, CorrespondenceBase):
            id = np.array(tag_parsed.__hash__()).astype(np.uint32).item()
            elem_processed = {"id": id, **tag_parsed.model_dump()}

            tags = tag_data.get(elem.tag, [])
            tags.append(elem_processed)
            tag_data.update({elem.tag: tags})

    fin.close()
    return tag_data

In [None]:
import redis
from redis.commands.json.path import Path


r = redis.Redis(host="localhost", port=6379)

with tqdm.tqdm(backups) as t:
    for backup in t:
        t.set_description(f"processing {backup.key}")
        processed_backup = process_backup(
            s3_client=s3_client, bucket_name=bucket.name, backup_key=backup.key
        )

        parts_count = 0
        pipe = r.pipeline()
        for element_type, elements in processed_backup.items():
            for e in elements:
                e_key = f'{element_type}:{e["id"]}'
                pipe.json().set(e_key, Path.root_path(), e)
        pipe.execute()

In [None]:
pipe = r.pipeline()
[pipe.json().get(k) for k in r.keys("mms:*")]
mms = pipe.execute()
df = pd.DataFrame(mms)
df

In [None]:
common_schema = pa.schema(
    [
        pa.field("date", pa.timestamp(unit="ms", tz="EDT")),
        pa.field("readable_date", pa.string()),
        pa.field("contact_name", pa.string()),
    ]
)
calls_schema = pa.unify_schemas(
    [
        common_schema,
        pa.schema(
            [
                pa.field("number", pa.string()),
                pa.field("duration", pa.uint32()),
                pa.field("type", pa.uint8()),
                pa.field("presentation", pa.uint8()),
                pa.field("subscription_id", pa.string()),
                pa.field("post_dial_digits", pa.string()),
                pa.field("subscription_component_name", pa.string()),
            ]
        ),
    ]
)

df = pd.DataFrame(
    data=tag_data.get("call"), columns=[field.name for field in calls_schema]
)
df["date"] = df["date"].apply(lambda x: pd.Timestamp(int(x), unit="ms"))
df = df.astype({field.name: pd.ArrowDtype(field.type) for field in calls_schema})

# Set dtypes for each column based on the schema
table = pa.Table.from_pandas(df, schema=calls_schema)
table.schema

In [22]:
sms_schema_unique = pa.schema(
    [
        pa.field("protocol", pa.string()),
        pa.field("address", pa.string()),
        pa.field("type", pa.string()),
        pa.field("subject", pa.null()),
        pa.field("body", pa.string()),
        pa.field("toa", pa.null()),
        pa.field("sc_toa", pa.null()),
        pa.field("service_center", pa.string()),
        pa.field("read", pa.string()),
        pa.field("status", pa.string()),
        pa.field("locked", pa.string()),
        pa.field("date_sent", pa.timestamp(unit="ms", tz="EDT")),
        pa.field("sub_id", pa.string()),
        pa.field("contact_name", pa.string()),
    ]
)
sms_schema = pa.unify_schemas([common_schema, sms_schema_unique])

In [None]:
df = pd.DataFrame(data=tag_data.get("sms"))
df["date"] = df["date"].apply(lambda x: pd.Timestamp(int(x), unit="ms"))
df["date_sent"] = df["date_sent"].apply(lambda x: pd.Timestamp(int(x), unit="ms"))

df

In [None]:
mms_df = pd.DataFrame(data=tag_data.get("mms"))
mms_df["date"] = mms_df["date"].apply(lambda x: pd.Timestamp(int(x), unit="ms"))
mms_df["date_sent"] = mms_df["date_sent"].apply(
    lambda x: pd.Timestamp(int(x), unit="ms")
)
mms_df["seen"] = mms_df["seen"].apply(lambda x: bool(x))
mms_df["msg_box"] = mms_df["msg_box"].astype(int)
mms_schema_unique = pa.schema(
    [
        pa.field("rr", pa.string()),
        pa.field("sub", pa.string()),
        pa.field("ct_t", pa.string()),
        pa.field("read_status", pa.null()),
        pa.field("seen", pa.bool_()),
        pa.field("msg_box", pa.uint8()),
        pa.field("address", pa.string()),
        pa.field("sub_cs", pa.string()),
        pa.field("resp_st", pa.string()),
        pa.field("retr_st", pa.null()),
        pa.field("d_tm", pa.null()),
        pa.field("text_only", pa.string()),
        pa.field("exp", pa.string()),
        pa.field("locked", pa.string()),
        pa.field("m_id", pa.string()),
        pa.field("st", pa.null()),
        pa.field("retr_txt_cs", pa.null()),
        pa.field("retr_txt", pa.null()),
        pa.field("creator", pa.string()),
        pa.field("date_sent", pa.timestamp(unit="ms", tz="EDT")),
        pa.field("read", pa.string()),
        pa.field("m_size", pa.string()),
        pa.field("rpt_a", pa.null()),
        pa.field("ct_cls", pa.null()),
        pa.field("pri", pa.string()),
        pa.field("sub_id", pa.string()),
        pa.field("tr_id", pa.string()),
        pa.field("resp_txt", pa.null()),
        pa.field("ct_l", pa.string()),
        pa.field("m_cls", pa.string()),
        pa.field("d_rpt", pa.string()),
        pa.field("v", pa.string()),
        pa.field("m_type", pa.string()),
        pa.field(
            "parts",
            pa.list_(
                pa.struct(
                    [
                        pa.field("cd", pa.string()),
                        pa.field("chset", pa.string()),
                        pa.field("cid", pa.string()),
                        pa.field("cl", pa.string()),
                        pa.field("ct", pa.string()),
                        pa.field("ctt_s", pa.string()),
                        pa.field("ctt_t", pa.string()),
                        pa.field("fn", pa.string()),
                        pa.field("seq", pa.string()),
                        pa.field("text", pa.string()),
                    ]
                )
            ),
        ),
        pa.field(
            "addrs",
            pa.list_(
                pa.struct(
                    [
                        pa.field("address", pa.string()),
                        pa.field("type", pa.string()),
                        pa.field("charset", pa.string()),
                    ]
                )
            ),
        ),
    ]
)
mms_schema = pa.unify_schemas([common_schema, mms_schema_unique])

table = pa.Table.from_pandas(mms_df, schema=mms_schema)
table

In [None]:
table = pa.Table.from_pandas(mms_df["parts"])
for field in table.schema:
    print(f'pa.field("{field.name}", pa.{field.type}()),')

In [None]:
import os
import glob


def upload_file_s3(bucket: Bucket, file_path: str):
    key = os.path.basename(file_path)
    with open(file_path, "rb") as fp:
        bucket.upload_fileobj(fp, Key=key, ExtraArgs={"ContentType": "application/xml"})


for file in tqdm.tqdm(glob.glob("./backups/*.xml")):
    upload_file_s3(bucket, file)

In [12]:
table = dynamodb.create_table(
    TableName="sms-backup-restore",
    KeySchema=[
        {"AttributeName": "id", "KeyType": "HASH"},  # Partition_key
        {"AttributeName": "timestamp", "KeyType": "RANGE"},  # Sort_key
    ],
    AttributeDefinitions=[
        {"AttributeName": "id", "AttributeType": "N"},
        {"AttributeName": "timestamp", "AttributeType": "S"},
    ],
    ProvisionedThroughput={"ReadCapacityUnits": 10, "WriteCapacityUnits": 10},
)
for element_type, elements in processed_backup.items():
    for e in elements:
        table.put_item(Item=e)

In [None]:
serializer = boto3.dynamodb.types.TypeSerializer()
serialize_dynamodb = lambda x: {k: serializer.serialize(v) for k, v in x.items()}
from itertools import batched

for batch in tqdm.tqdm(batched(processed_backup, 25)):
    put_requests = [{ 'PutRequest': { 'Item': serialize_dynamodb(e) }} for e in batch]
    response = dynamodb.batch_write_item(RequestItems={'sms-backup-restore': put_requests})
    print(response)

In [None]:
print("Table status:", table.table_status)
deserializer = boto3.dynamodb.types.TypeDeserializer()
#python_data = {k: deserializer.deserialize(v) for k, v in low_level_data.items()}

# To go from python to low-level format
print(len(json.dumps(put_requests)))