In [20]:
import pandas as pd
import numpy as np
import sqlalchemy
import config as c
from typing import NamedTuple, Optional
import re
from pymystem3 import Mystem
from collections import defaultdict
from nltk import wordpunct_tokenize

In [21]:
class FileNames(NamedTuple):
    """class to store input filenames"""
    main: str
    informators: str = "data/undone/tblInformators.csv"
    collectors: str = "data/undone/tblSobirately.csv"

In [22]:
class GeoNames(NamedTuple):
    "class to store district and region"
    district: str
    region: str

In [23]:
class GeoTables(NamedTuple):
    """class to store geotables"""
    villages: pd.DataFrame
    regions: pd.DataFrame
    districts: pd.DataFrame

In [24]:
class JoinedTable():
    def __init__(self, tablename: str, engine: sqlalchemy.engine.Engine):
        self.engine: sqlalchemy.engine.Engine = engine
        self.tablename: str = tablename
        self.dataframe: pd.DataFrame = pd.read_sql_table(
            self.tablename,
            schema=c.DB,
            con=self.engine
        )
        if self.dataframe.shape[0] > 0:
            self.last_index = self.dataframe["id"].iloc[-1]
        else:
            self.last_index = 0

    @property
    def df(self) -> pd.DataFrame:
        return self.dataframe

    def drop_duplicates(self, *args, **kwargs):
        kwargs["inplace"] = (
            kwargs.get("inplace") or False
        )
        kwargs["subset"] = (
            kwargs.get("subset") 
            or [col for col in self.dataframe.columns if "id" not in col]
        )
        if not kwargs["inplace"]:
            self.dataframe = self.dataframe.drop_duplicates(*args, **kwargs)
            return self
        self.dataframe.drop_duplicates(*args, **kwargs)

    def join(self, new_dataframe: pd.DataFrame):
        if not "id" in new_dataframe.columns:
            new_dataframe.insert(
                loc=0,
                column="id",
                value=list(
                    range(self.last_index + 1, self.last_index + 1 + new_dataframe.shape[0]))
                )
        self.dataframe = self.dataframe.append(new_dataframe)
        return self

    @property
    def new_indices(self) -> pd.Series:
        return self.dataframe.loc[self.dataframe["id"] > self.last_index]["id"]

    @property
    def new_entries(self) -> pd.DataFrame:
        return self.dataframe.iloc[-self.new_indices.shape[0]:,:]

    def upload(self):
        if self.new_indices.shape[0] == 0:
            print(f"nothing to insert in {self.tablename}")
            return
        self.new_entries.to_sql(
            name=self.tablename,
            con=self.engine,
            schema=c.DB,
            index=False,
            if_exists="append"
        )
        print(f"upload to {self.tablename} complete")


In [25]:
class JoinedRelation(JoinedTable):
    def __init__(
        self,
        tablename: str,
        engine: sqlalchemy.engine.Engine,
    ):
        super().__init__(tablename, engine)
        
    def join(
        self,
        main_dataframe: pd.DataFrame,
        relation_dataframe: pd.DataFrame,
        on: tuple = ("value", "code"),
        suffixes: tuple = ("_x", "_y")
    ):
        new_dataframe = pd.merge(
            left=main_dataframe,
            right=relation_dataframe,
            suffixes=suffixes,
            left_on=[on[0]],
            right_on=[on[1]],
            # how="left"
        )
        id_cols = ["id" + suffixes[0], "id" + suffixes[1]]
        new_dataframe = new_dataframe[id_cols]
        # self.dataframe = self.dataframe.iloc[0:0]
        return super().join(new_dataframe)


In [26]:
def read_main_file(filename: str):
    if filename.endswith(".csv"):
        df = pd.read_csv(filename, sep="\t")
    else:
        df = pd.read_excel(filename)
    df.drop(["zvuk2", "photo", "photo2", "термины", "аннотация"], inplace=True, axis=1)
    return df

def read_sup_file(filename: str):
    return pd.read_csv(filename, sep="\t")

In [27]:
def produce_text_df(origin: pd.DataFrame, leader="АБМ"):
    """Insert values from the origin df into a text df template"""
    new_df = pd.DataFrame(columns=["old_id", "year", "leader", "address", "raw_text", "old_genre", 'geo_id', "genre", "pdf"])
    new_df[["raw_text", "year"]] = origin[["текст", "год"]]
    new_df["leader"] = leader
    new_df = new_df.fillna(np.nan).replace([np.nan], [None])
    return new_df

In [28]:
def produce_place_df(
    main_file: pd.DataFrame,
    inform_file: pd.DataFrame,
    geonames: GeoNames = None
) -> pd.DataFrame:
    if not geonames:
        raise ValueError("Names for district and region are missing")
    district, oblast = geonames
    all_places: pd.Series = pd.concat([main_file["село"], inform_file["село"]])
    all_places = all_places.str.strip().str.replace("  ", " ")
    vil_df = pd.DataFrame(columns=["village_name", "map_region", "map_district"])
    vil_df["village_name"] = all_places.unique()
    vil_df["map_region"] = oblast
    vil_df["map_district"] = district
    return vil_df

def produce_place_df_complex(main_file: pd.DataFrame, inform_file: pd.DataFrame):
    all_places: pd.Series = main_file["село"]
    all_places = all_places.str.strip().str.replace("  ", " ")
    vil_df: pd.DataFrame = all_places.str.split(", ", expand=True)
    vil_df.columns = ["map_region", "map_district", "village_name"]
    vil_df["map_region"] = vil_df["map_region"].str.replace("обл.", "область")
    vil_df["map_district"] = vil_df['map_district'].str.replace("р-н", "район")
    for idx, row in vil_df.iterrows():        
        if row["village_name"] is not None:
            continue
        row[["map_region", "village_name"]] = row[["map_region", "map_district"]]
        row["map_district"] = None
        
    vil_df = vil_df[["village_name", "map_region", "map_district"]]
    return vil_df
        


In [29]:
def produce_region_district(place_df: pd.DataFrame):
    regions = place_df["map_region"].dropna().apply(lambda x: x.strip().replace("  ", " ")).unique()
    districts = place_df["map_district"].dropna().apply(lambda x: x.strip().replace("  ", " ")).unique()
    region_df = pd.DataFrame(regions, columns=["region_name"])
    district_df = pd.DataFrame(districts, columns=["district_name"])
    return GeoTables(villages=place_df, regions=region_df, districts=district_df)

In [30]:
def produce_geo_df(
    vil_df: pd.DataFrame,
    reg_df: pd.DataFrame,
    dis_df: pd.DataFrame
) -> pd.DataFrame:
    """Use after the dfs have been concatenated with tables from the server
    Produced table should be concatenated with g_geo_people && g_geo_text
    """
    assert all(map(lambda x: "id" in x.columns, [vil_df, reg_df, dis_df]))
    geo_df = pd.DataFrame(
        columns=["id_region", "id_district", "id_village"]
    )
    geo_df["id_village"] = vil_df["id"]

    region_ids = pd.merge(
        vil_df, reg_df, how='left', left_on=["map_region"], right_on=["region_name"], suffixes=("_village", "_region")
    )["id_region"].dropna()
    district_ids = pd.merge(
        vil_df, dis_df, how='left', left_on=["map_district"], right_on=["district_name"], suffixes=("_village", "_district")
    )["id_district"].dropna()

    assert region_ids.shape[0] == vil_df.shape[0]
    assert district_ids.shape[0] == vil_df.shape[0]

    geo_df["id_region"] = region_ids
    geo_df["id_district"] = district_ids
    return geo_df

In [31]:
def get_geo_ids(main_file: pd.DataFrame, place_df: pd.DataFrame, geo_df: pd.DataFrame):
    assert "id" in geo_df.columns, "Geo df and geo table need to be joined first"
    main_file["село"] = main_file["село"].apply(lambda x: x.split(", ")[-1].strip())
    print(main_file.tail())
    df = pd.merge(
        main_file, place_df, how='left', left_on=["село"], right_on=["village_name"]
    )
    # print(*df.shape, sep=", ")
    df = pd.merge(
        df, geo_df, how='left', left_on=["id"], right_on=["id_village"], suffixes=("_old", "_geo")
    )
    # print(*df.shape, sep=", ")
    # print(df.shape[0], main_file.shape[0], sep=" - new. Main: ")
    assert df.shape[0] == main_file.shape[0]
    return df["id_geo"]

In [32]:
def get_year(bio):
    years = re.findall("([0-9]{4}) +г. *р.", bio[:20])
    if years:
        return int(years[0])
    else:
        return None

In [33]:
def g_name(name):
    global m
    name = re.sub("\(.*?\)", " ", name)
    res = m.analyze(name)
    fem, mal = 0, 0
    if len(res) == 6:
        res = res[2:]
    for i in res:
        if 'analysis' in i:
            try:
                if ',жен,' in i['analysis'][0]['gr']:
                    fem += 1
                elif ',муж,' in i['analysis'][0]['gr']:
                    mal += 1
            except:
                pass
    if fem > 0 and mal == 0:
        return 'f'
    elif fem == 0 and mal > 0:
        return 'm'
    else:
        return None

In [34]:
def produce_inform_df(origin: pd.DataFrame, place_df: pd.DataFrame):
    df = pd.DataFrame(
        columns=["old_id", "code", "name", "gender", "birth_year", \
        "bio", "current_region", "current_district", \
        "current_village", "birth_region", "birth_district", \
        "birth_village", "current_geo_id", "birth_geo_id"]
    )
    df[["code", "name", "bio"]] = origin[["инициалы", "ФИО", "биография"]]

    df[[
        "current_village",
        "current_region",
        "current_district",
        "birth_village",
        "birth_region",
        "birth_district"
    ]] = pd.DataFrame(origin["село"]).merge(
        place_df, left_on=["село"], right_on=["village_name"], how="left"
    )[[
        "village_name",
        "map_region",
        "map_district",
        "village_name",
        "map_region",
        "map_district"
    ]]

    df["gender"] = origin["ФИО"].apply(g_name)
    df["birth_year"] = origin["биография"].apply(get_year)
    
    return df


In [35]:
def produce_collector_df(origin: pd.DataFrame):
    df = pd.DataFrame(columns=["old_id", "code", "name"])
    df[["old_id", "code", "name"]] = origin[["Номер", "Код собирателя", "ФИО собирателя"]]
    return df

In [36]:
def produce_text_pivot(origin: pd.DataFrame, text_table: JoinedTable, stub:str):
    "aggregates columns собиратель% or информант%"

    ids = text_table.new_indices
    assert ids.shape[0] > 0
    
    target_cols = [i for i in origin.columns if stub in i]
    subset: pd.DataFrame = origin[target_cols]
    subset.insert(0, column="id", value=ids)

    long = pd.melt(subset, id_vars=["id"], value_vars=target_cols)
    long.dropna(axis=0, inplace=True)
    long.drop(["variable"], axis=1, inplace=True)

    return long

In [37]:
def produce_list_pivot(main_file: pd.DataFrame, text_table: JoinedTable, key: str="ключевые слова"):
    ids = text_table.new_indices
    assert ids.shape[0] > 0
    observations = []
    for idx, word_list in zip(ids, main_file[key]):
        if not isinstance(word_list, str):
            continue
        word_list = word_list.split(", ")
        observations += [{"id": idx, "value": word} for word in word_list]
    return pd.DataFrame.from_records(observations)

In [38]:
def parse_questions(x):
    orig = x
    x = re.sub("([0-9]+)([а-яa-z])", "\g<1> \g<2>", x)
    result = defaultdict(list)
    row = wordpunct_tokenize(x)
    cur_num = 0
    for r in row:
        if r.isdigit():
            if cur_num not in result and cur_num != 0:
                result[cur_num].append("")
            cur_num = int(r)
        elif r in {",", ".", ";", ".,"}:
            continue
        elif r.isalpha():
            if r not in result[cur_num]:
                result[cur_num].append(r)
            elif r == "доп":
                result[0].append(r)
        else:
            print(r)
    if cur_num not in result and cur_num != 0:
        result[cur_num].append("")
        
    final = []
    for q in result:
        for subq in result[q]:
            final.append(
                str(q) + (" " + subq if subq else "")
            )
    return sorted(final)

In [39]:
def produce_question_pivot(main_file: pd.DataFrame, text_table: JoinedTable):
    df = main_file[["программа", "вопрос"]].apply(
        lambda x: list(
            map(lambda y: x["программа"] + "-" + y, parse_questions(x["вопрос"]))
        ),
        axis=1
    )
    observations = []
    ids = text_table.new_indices
    for idx, q_list in zip(ids, df):
        observations += [{"id":idx, "value":n} for n in q_list]
    return pd.DataFrame.from_records(observations)

In [40]:
def concat_question_cols(x):
    return [
        x["id"],
        (
            x["question_list"] 
            + "-" + str(x["question_num"]) 
            + ((" " + x["question_letter"]) if x["question_letter"] else "")
        )
    ]

In [41]:
def produce_media_df(main_file: pd.DataFrame, text_table: JoinedTable, keys=("zvuk", "audio")):
    df = pd.DataFrame(columns=["id_text", keys[1], "start"])
    df["id_text"] = text_table.new_indices
    df[keys[1]] = main_file[keys[0]]
    return df

In [42]:
m = Mystem()
# names = FileNames(main="data/undone/novolokti.xlsx")
def main(names: FileNames, geonames: GeoNames=None):
    """:param names: Paths of files that will be inserted into the dataframe"""
    engine = sqlalchemy.create_engine(f"mysql+pymysql://{c.USER}:{c.PASSWORD}@{c.HOST}:{c.PORT}/{c.DB}")
    main_file = read_main_file(names.main)
    inform_file = read_sup_file(names.informators)
    collector_file = read_sup_file(names.collectors)
    text_df = produce_text_df(main_file)

    # geo operations
    villages = JoinedTable(tablename="g_villages", engine=engine)
    regions = JoinedTable(tablename="g_regions", engine=engine)
    districts = JoinedTable(tablename="g_districts", engine=engine)
    place_df, region_df, district_df = produce_region_district(
        (
            produce_place_df(main_file, inform_file, geonames=geonames) if geonames 
            else produce_place_df_complex(main_file, inform_file)
        )
    )
    villages.join(place_df).drop_duplicates(
        subset=["village_name", "map_region", "map_district"]
    ).upload()
    (regions
    .join(region_df)
    # .drop_duplicates()
    .drop_duplicates(inplace=True)
    )
    regions.upload()
    (districts
    .join(district_df)
    # .drop_duplicates()
    .drop_duplicates(inplace=True)
    )
    districts.upload()
    #
    geo_df = produce_geo_df(
        villages.new_entries,
        regions.dataframe,
        districts.dataframe
    )
    geo_table = JoinedTable("g_geo_text", engine=engine)
    geo_table.join(geo_df)
    text_df["geo_id"] = get_geo_ids(
        main_file,
        villages.new_entries,
        geo_table.new_entries
    )
    # print(text_df.tail())
    # raise Exception
    geo_table.dataframe = geo_table.dataframe.set_index("id").drop(
        list(set(geo_table.new_indices) - set(text_df["geo_id"])),
        axis=0
    ).reset_index()
    geo_table.upload()
    #
    # text operations
    texts = JoinedTable(tablename="texts", engine=engine)
    texts.join(text_df)
    texts.upload()
    # inform operations
    inform_df = produce_inform_df(inform_file, place_df)
    inf_table = JoinedTable("informators", engine)
    inf_table.join(inform_df).upload()
    inf_pivot = produce_text_pivot(main_file, texts, "информант")
    t_i = JoinedRelation("t_i", engine)
    t_i.join(
        inf_pivot,
        inf_table.new_entries,
        suffixes=("_text", "_informator")
    ).upload()    
    # collector operations
    collector_table = JoinedTable("collectors", engine)
    collector_df = produce_collector_df(collector_file)
    collector_table.join(collector_df)
    collector_table.dataframe.drop_duplicates(subset=["code", "name"], inplace=True)
    collector_table.upload()
    col_pivot = produce_text_pivot(main_file, texts, "собиратель")
    t_c = JoinedRelation("t_c", engine)
    t_c.join(
        col_pivot,
        collector_table.dataframe,
        suffixes=("_text", "_collector")
    ).upload()
    #keyword operations
    keyword_df = produce_list_pivot(main_file, texts)
    keyword_table = JoinedTable("keywords", engine)
    t_k = JoinedRelation("t_k", engine)
    t_k.join(
        keyword_df,
        keyword_table.dataframe,
        on=("value", "word"),
        suffixes=("_text", "_keyword")
    ).upload()
    # question operations
    question_pivot = produce_question_pivot(main_file, texts)
    questions = JoinedTable("questions", engine)
    quest_alias = questions.dataframe.apply(
        concat_question_cols,
        axis=1,
        result_type='expand'
    )
    quest_alias.columns = ["id", "question"]
    t_q = JoinedRelation("t_q", engine)
    t_q.join(
        question_pivot,
        quest_alias,
        on=("value", "question"),
        suffixes=("_text", "_question")
    ).upload()
    audio_table = JoinedTable("t_audio", engine)
    audio_df = produce_media_df(main_file, texts).dropna(axis=0)
    audio_table.join(audio_df).upload()
    return

In [None]:
main(
    FileNames(
        "data/undone/tblCards.csv",
        "data/undone/tblInformators.csv",
        "data/undone/tblSobirately.csv"
    ),
    # GeoNames(district="Чериковская область", region="Могилевский район")
)

In [None]:
import sys
import os

if __name__ == "__main__":
    names = sys.argv[1:]
    for name in names:
        assert os.path.isfile(name)
    
    main(names)
    sys.exit(0)

In [74]:
engine = sqlalchemy.create_engine("mysql+pymysql://root:qwerty@localhost:3306/folklore_v2")

In [75]:
# import pandas as pd

# ids = pd.read_csv("data/ids.csv", sep=",", quotechar='\\', index_col=None)
# qs = pd.read_csv("data/qs.csv", sep=",", quotechar='\\', index_col=None)
# ids['text'] = ids['text'].apply(lambda x: x[:50])
# qs['text'] = qs['text'].apply(lambda x: x[:50])
# new = pd.merge(left=ids, right=qs, how='inner', left_on=["text"], right_on=['text'])[["id", "prog", "вопрос"]]
# new.columns = ["id", 'prog', 'quest']
# df = new[["prog", "quest"]].apply(
#     lambda x: list(
#         map(lambda y: x["prog"] + "-" + y, parse_questions(x["quest"]))
#     ),
#     axis=1
# )
# observations = []
# ids = new.id
# for idx, q_list in zip(ids, df):
#     observations += [{"id":idx, "value":n} for n in q_list]
# q_mapping = pd.DataFrame.from_records(observations)

# questions = JoinedTable("questions", engine)
# quest_alias = questions.dataframe.apply(
#     concat_question_cols,
#     axis=1,
#     result_type='expand'
# )
# quest_alias.columns = ["id", "question"]
# t_q = JoinedRelation("t_q", engine)
# t_q.join(
#     q_mapping,
#     quest_alias,
#     on=("value", "question"),
#     suffixes=("_text", "_question")
# )
# t_q.upload()

In [92]:
# main_file = read_main_file("data/undone/tblCards.csv",)
# keyword_table = JoinedTable("keywords", engine)
# texts = JoinedTable(tablename="texts", engine=engine)
# texts_other = texts.dataframe.copy()
# texts_other.raw_text = texts_other.raw_text.dropna().apply(lambda x: x[:65])
# main_file = main_file[["текст", "ключевые слова"]]
# main_file.columns = ["text", "keywords"]
# main_file.text = main_file.text.apply(lambda x: x[:65])
# newkw = pd.merge(left=texts_other, right=main_file, left_on=['raw_text'], right_on=["text"], how='inner')
# newkw.head(5)
# ids = newkw.id
# assert ids.shape[0] > 0
# observations = []
# for idx, word_list in zip(ids, newkw.keywords):
#     if not isinstance(word_list, str):
#         continue
#     word_list = word_list.split(", ")
#     observations += [{"id": idx, "value": word} for word in word_list]
# kws = pd.DataFrame.from_records(observations)
# t_k = JoinedRelation("t_k", engine)
# t_k.join(
#     kws,
#     keyword_table.dataframe,
#     on=("value", "word"),
#     suffixes=("_text", "_keyword")
# )
# t_k.upload()