In [1]:
from typing import Iterator, Dict, Any, Optional
import psycopg2.extras
import psycopg2
import json
import os
import io

In [2]:
def sample(file_path: str, file_name: str, limit=100):

    sample_path = os.path.join(file_path, f"sample_{limit}")
    if os.path.exists(sample_path) == False:
        os.mkdir(sample_path)

    with open(os.path.join(file_path, file_name)) as file:
        sample_lines = [next(file) for x in range(limit)]

    with open(os.path.join(sample_path, file_name), "a") as file_sample:
        file_sample.writelines(sample_lines)


In [5]:
# sample('C:/Users/Krips/Documents/Programming/PDT/','conversations.jsonl', 100000)
# sample('C:/Users/Krips/Documents/Programming/PDT/','conversations.jsonl', 10000)
# sample('C:/Users/Krips/Documents/Programming/PDT/','conversations.jsonl', 1000)
sample('C:/Users/Krips/Documents/Programming/PDT/','conversations.jsonl', 1000000)
# sample('C:/Users/Krips/Documents/Programming/PDT/','authors.jsonl', 100000)
# sample('C:/Users/Krips/Documents/Programming/PDT/','authors.jsonl', 10000)
# sample('C:/Users/Krips/Documents/Programming/PDT/','authors.jsonl', 1000)
sample('C:/Users/Krips/Documents/Programming/PDT/','authors.jsonl', 1000000)

In [None]:
def clean_csv_value(value: Optional[Any]):
    return (
        str(value)
        .replace('\n', '\\n')
        .replace('\t', '\\t')
        .replace('\r', '\\r')
        .replace('\x00', '')
        .replace('\\', '\\\\')
    )

In [None]:
class Extractor:
    @staticmethod
    def generate_author_row(_input: dict = {}):
        return {
            "id": _input.get("id", r'\N'),
            "name": _input.get("name", r'\N'),
            "username": _input.get("username", r'\N'),
            "description": _input.get("description", r'\N'),
            "followers_count": _input.get("public_metrics.followers_count", r'\N'),
            "following_count": _input.get("public_metrics.following_count", r'\N'),
            "tweet_count": _input.get("public_metrics.tweet_count", r'\N'),
            "listed_count": _input.get("public_metrics.listed_count", r'\N'),
        }


class PostgreClient:
    @staticmethod
    def create_author_table(cursor) -> None:
        cursor.execute(
            """
            DROP TABLE IF EXISTS authors;
            CREATE UNLOGGED TABLE authors (
                id                  bigint PRIMARY KEY,
                name                VARCHAR ( 255 ),
                username            VARCHAR ( 255 ),
                description         text,
                followers_count     integer,
                following_count     integer,
                tweet_count         integer,
                listed_count        integer
            );
        """
        )
            
    def copy_stringio(connection, authors: Iterator[Dict[str, Any]]) -> None:
        with connection.cursor() as cursor:
            csv_file_like_object = io.StringIO()
            for author in authors:
                csv_file_like_object.write('\t'.join((
                    author["id"],
                    clean_csv_value(author["name"]),
                    clean_csv_value(author["username"]),
                    clean_csv_value(author["description"]),
                    author["followers_count"],
                    author["following_count"],
                    author["tweet_count"],
                    author["listed_count"],
                )) + '\n')
            csv_file_like_object.seek(0)
            cursor.copy_from(csv_file_like_object, 'authors', sep='\t')
            connection.commit()


In [None]:
from datetime import datetime
import json
import time
from pathlib import Path
import csv

# import asyncio
import threading, queue


def author_row_traverse(file_path: str, file_name: str):
    unique = {}
    duplicates = set()
    block_entries = []

    block_size = 0
    batch_size = 0

    import_start_time = datetime.now()
    block_start_time = datetime.now()

    connection = psycopg2.connect(
        dbname="PDT",
        user="postgres",
        password="291122",
        host="localhost",
        port="5432",
    )
    # connection.autocommit = True

    with connection.cursor() as cursor:
        PostgreClient.create_author_table(cursor)
        connection.commit()

    with open(os.path.join(file_path, file_name), encoding='utf-8') as file:
        for line in file:
            batch_size += 1

            _json_file = json.loads(line)
            if _json_file.get("id", "None") in unique:
                duplicates.add(_json_file.get("id"))
            else:
                unique[_json_file.get("id")] = True
                block_entries.append(Extractor.generate_author_row(_json_file))

            if batch_size == 10_000:
                block_end_time = datetime.now()
                _time_delta = block_end_time - block_start_time
                print("-")
                print(f"time now: {block_end_time.isoformat()}")
                print(
                    f"block excecution time: {int(_time_delta.seconds/60)}:{int(_time_delta.seconds % 60)}"
                )
                block_start_time = block_end_time
                batch_size = 0
                block_size += 1
                if block_size == 10:
                    PostgreClient.copy_stringio(connection, block_entries)
                    block_entries = []
                    block_size = 0

    import_end_time = datetime.now()
    _time_delta = import_end_time - import_start_time
    print(
        f"import excecution time: {int(_time_delta.seconds/60)}:{int(_time_delta.seconds % 60)}"
    )


In [None]:
# author_row_traverse('C:/Users/Krips/Documents/Programming/PDT/sample_10000/', 'authors.jsonl', block_max_size=1_000)
author_row_traverse('C:/Users/Krips/Documents/Programming/PDT/', 'authors.jsonl')

In [None]:
# import psycopg2

# conn = psycopg2.connect(
#     dbname='PDT',
#     user='postgres',
#     password='291122',
#     host='localhost',
#     port='5432'
# )
# with conn.cursor() as cursor:
#     cursor.execute(Extractor.create_author_table())
#     conn.commit()
#     # cursor.execute("SELECT * FROM authors")
#     # print(cursor.fetchone())
    
#     cursor.close()
#     # commit the changes

In [5]:
def func2(_dict):
    _dict['key2'] = 1
    _dict['key4'] = 1
    
def func1(_dict):
    
    _dict['key1'] = 1
    func2(_dict)
    
    
    
if __name__ == '__main__':
    _dict = {}
    func1(_dict)
    print(_dict)
    print('key1' not in _dict)

{'key1': 1, 'key2': 1, 'key4': 1}
False


In [None]:
import concurrent.futures

nums = [1,2,3,4,5,6,7,8,9,10]*1000000

def f(x):
    return x * x
def main():
    # Make sure the map and function are working
    print([val for val in map(f, nums)])

    # Test to make sure concurrent map is working
    with concurrent.futures.ProcessPoolExecutor() as executor:
        print([val for val in executor.map(f, nums)])

if __name__ == '__main__':
    main()

In [None]:
import multiprocessing as mp
import psutil

nums = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]*100000000


def f(x):
    result = 1
    for i in x:
        result += i
    return result


def spawn():
    procs = list()
    n_cpus = psutil.cpu_count() - 6 - 8
    print(n_cpus)
    for cpu in range(n_cpus):
        affinity = [cpu]
        d = dict(affinity=affinity)
        p = mp.Process(target=run_child, kwargs=d)
        p.start()
        procs.append(p)
    for p in procs:
        p.join()
        print('joined')


def run_child(affinity):
    proc = psutil.Process()  # get self pid
    print(f'PID: {proc.pid}')
    aff = proc.cpu_affinity()
    print(f'Affinity before: {aff}')
    proc.cpu_affinity(affinity)
    aff = proc.cpu_affinity()
    print(f'Affinity after: {aff}')
    print(f(nums))


if __name__ == '__main__':
    spawn()


In [16]:
def _row(i):
    if i == 1:
        return
    if i == 0:
        return {'id':i}

i = 0
rows = []
for x in range(6):
    rows.append(_row(i))
    i += 1
    
_rows = [row for row in filter(lambda x: x is not None, rows)]
print(_rows)

[{'id': 0}]


In [18]:
print(set(['PDT','pdt','pdT','Pdt','pDT','PDt','PdT','PDT']))

{'Pdt', 'pdt', 'PdT', 'PDt', 'pDT', 'pdT', 'PDT'}
