### Импорт библиотек: выполнить 1 раз в начале работы

In [1]:
!pip install bs4
!pip install marshmallow
!pip install pandas

Collecting marshmallow
  Downloading marshmallow-3.13.0-py2.py3-none-any.whl (47 kB)
[K     |████████████████████████████████| 47 kB 3.1 MB/s 
[?25hInstalling collected packages: marshmallow
Successfully installed marshmallow-3.13.0


In [2]:
import pandas as pd
import os
import re
import shlex
from bs4 import BeautifulSoup
from abc import ABCMeta, abstractmethod
from marshmallow import Schema, ValidationError, fields, validates, validates_schema
from collections.abc import Callable
from typing import List, Union
from collections import OrderedDict

### Подгрузить файлы с интервью (левая панель в google colab)
### выполнить все ячейки ниже
### выполнить функцию main (см. ниже)

## Converters

In [3]:
class Singleton(type):
    """
    Metaclass that ensures only one instance of each converter is present
    """
    _instances = {}
    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]

In [4]:
def check_args(func:Callable) -> Callable:
    """
    The decorator ensures that the file passed to the converter
    1. exists
    2. has the correct extension
    3. is correctly named
    """
    def wrapper(self, filename:str) -> Callable:
        filepath = os.path.join(self.curdir, filename)
        if not os.path.isfile(filepath):
            raise OSError("File not found")
        filename, extension = os.path.splitext(filename)
        if extension != self.old:
            raise ValueError("Invalid file extension")
        if self.prefix not in filename:
            raise ValueError(f"The filename does not contain the required prefix {self.prefix}")
        return func(self, filename)
    return wrapper

In [5]:
class AbstractConverter():
    """Abstract class for all converters"""
    __slots__ = ("prefix", "curdir", "old", "new")
    
    def __init__(self, old:str, new:str, prefix:str="") -> None:
        self.prefix = prefix
        self.curdir = os.getcwd()
        self.old = "." + old
        self.new = "." + new
    
    def convert(self, filename:str) -> None:
        pass

In [17]:
class Docx2XmlConverter(AbstractConverter, metaclass=Singleton):
    """Converter from docx to xml"""
    @check_args
    def convert(self, filename:str) -> str:
        """Convertation method
        :param filename str: name of the input file
        :returns: name of the produced file
        :rtype: str
        """
        file = os.path.join(self.curdir, filename + self.old)
        new_path = os.path.join(self.curdir, filename + self.new)
        command = "unzip -oqq %s" % (shlex.quote(file))
        os.system(command)
        command_2 = "mv word/document.xml %s" % (shlex.quote(new_path))
        os.system(command_2)
        return filename + self.new
    
class Doc2XmlConverter(AbstractConverter, metaclass=Singleton):
    """Converter from doc to docbook xml format"""
    @check_args
    def convert(self, filename:str) -> str:
        """Convertation method
        :param filename str: name of the input file
        :returns: name of the produced file
        :rtype: str
        """
        file = os.path.join(self.curdir, filename + self.old)
        new_path = os.path.join(self.curdir, filename + self.new)
        command = "antiword -x db %s > %s" % (shlex.quote(file),
                                              shlex.quote(new_path))
        os.system(command)
        return filename + self.new
    
class Doc2TXtConverter(AbstractConverter, metaclass=Singleton):
    """Converter from doc to txt format"""
    @check_args
    def convert(self, filename:str) -> str:
        """Convertation method
        :param filename str: name of the input file
        :returns: name of the produced file
        :rtype: str
        """
        file = os.path.join(self.curdir, filename + self.old)
        new_path = os.path.join(self.curdir, filename + self.new)
        command = "antiword -t %s > %s" % (shlex.quote(file),
                                           shlex.quote(new_path))
        os.system(command)
        return filename + self.new

In [18]:
def convertermaker(old:str, new:str, prefix:str="") -> AbstractConverter:
    """Factory function for converters"""
    mapping = {
        "docxxml":Docx2XmlConverter,
        "docxml":Doc2XmlConverter,
        "doctxt":Doc2TXtConverter
    }
    try:
        return mapping[old + new](old=old, new=new, prefix=prefix)
    except:
        raise KeyError(f"No converter for the file types {old} & {new}")

## Validator

In [8]:
def validationmaker(true_vil: Union[List[str], None] = None,
                    true_year: Union[str, None] = None) -> Schema:
    """
    Factory function that defines a validation schema
    :param true_vil Union[str, None]: village name spelling to be checked
    :param true_year Union[str, None]: correct year to be checked
    :returns: schema to load the object into
    :rtype: Schema
    """
    class ValidationSchema(Schema):
        prog = fields.String(required=True)
        @validates("prog")
        def progval(self, data, **kwargs):
            if not bool(
            re.match(r"[IVXa]+", data)):
                raise ValidationError(f"Incorrect symbols in program: {data}")
        
        quest = fields.String(required=True)
        @validates("quest")
        def questval(self, data, **kwargs):
            if not bool(
            re.match(r"[0-9.,а-я]+", data)):
                raise ValidationError(f"Incorrect symbols in question: {data}")
            if bool(re.search(r"доп[^.]", data)) == True:
                raise ValidationError(f"Missing '.' in question: {data}")
        
        vil = fields.String(required=True)
        @validates("vil")
        def vilval(self, data, **kwargs):
            if true_vil is not None and data not in true_vil:
                raise ValidationError(f"Incorrect village: {data}")
        
        year = fields.String(required=True)
        @validates("year")
        def yearval(self, data, **kwargs):
            if true_year is not None and data != str(true_year):
                raise ValidationError(f"Incorrect year: {data}")
        
        sob1 = fields.String(required=True)
        sob2 = fields.String()
        sob3 = fields.String()
        sob4 = fields.String()
        main = fields.String(required=True)
        inf1 = fields.String(required=True)
        inf2 = fields.String()
        inf3 = fields.String()
        inf4 = fields.String()
        @validates_schema
        def codeval(self, data, **kwargs):
            for key in ["sob1", "sob2", "sob3", "sob4", "inf1", "inf2", "inf3", "inf4"]:
                if key in data:
                    if bool(re.search(r"[^А-Я\-0-9?]", data[key])) == True:
                        raise ValidationError(f"Incorrect symbols in code {data[key]}")
    return ValidationSchema()

## Parsers

In [9]:
def extension_check(extension:str) -> Callable:
    """
    The decorator ensures that the file passed to the converter
    1. exists
    2. has the correct extension (txt/xml)
    """
    def inner_checker(func:Callable) -> Callable:
        def wrapper(_, file, *args, **kwargs) -> Callable:
            _, file_ext = os.path.splitext(file)
            if not file_ext == extension:
                raise OSError(f"Invalid file type: {file_ext}")
            filepath = os.path.join(os.getcwd(), file)
            if not os.path.isfile(filepath):
                raise OSError(f"File not found: {file}")
            return func(_, file, *args, **kwargs)
        return wrapper
    return inner_checker

In [10]:
class Parser(metaclass=ABCMeta):
    @abstractmethod
    def parse(self, file:str) -> None:
        pass
    
    @staticmethod
    def separate(lines: List[str]) -> List[List[str]]:
        """
        Splits the lines into groups
        :param List[str] lines: all lines from a file
        :return: A list of line groups, corresponding to db entries
        :rtype: List[List[str]]
        """
        corr_lines = lines.copy()
        expr = re.compile(r"^[ ]+$")
        for i in range(len(corr_lines)): # transform lines with spaces only into empty lines
            if re.match(expr, corr_lines[i]):
                corr_lines[i] = ""
        big_line = "#".join(corr_lines).strip().strip("#")
        big_line = re.sub(r"###+", "##", big_line) # Replace sequences of empty lines with one empty line
        big_line = big_line.replace("\\\\", "\\") # Replace double accent marks with single ones
        raws = [i.split("#") for i in big_line.split("##")] # Split by empty lines
        return raws
    
    @staticmethod
    def analyze(raw:List[str]) -> dict:
        """
        Turn a single line group into a db entry
        :param List[str] raw: 5 lines that make a db entry
        :returns: A dict that can be used to create a pandas dataframe
        :rtype: dict
        """
        output_dict = OrderedDict()
        prog, quest = re.split(r"[–\-]", raw[0])
        output_dict["prog"], output_dict["quest"] = prog.strip().replace("Х", "X"), quest.strip()
        vil, year = re.split(r"['’ʼ‘]", raw[1])
        output_dict["vil"], output_dict["year"] = vil.strip(), year.strip()
        authors = raw[2].split(", ")
        for idx, key in enumerate(["sob1", "sob2", "sob3", "sob4"]):
            if idx < len(authors):
                output_dict[key] = authors[idx].strip()
            else:
                output_dict[key] = ""
        output_dict["main"] = "@".join(raw[3:-1])
        informers = raw[-1].split(", ")
        for idx, key in enumerate(["inf1", "inf2", "inf3", "inf4"]):
            if idx < len(informers):
                output_dict[key] = informers[idx].strip()
            else:
                output_dict[key] = ""        
        return output_dict
    
    def main(self, file:str, validation_schema:Schema) -> None:
        all_lines = self.parse(file)
        try:
            raw_objects = self.separate(all_lines)
            objects = []
            for obj in raw_objects:
                if len(obj) > 3:
                    try:
                        objects.append(self.analyze(obj))
                    except:
                        raise Exception(f"Error parsing object\n{obj}")
            _ = validation_schema.load(objects, many=True) if validation_schema else None
        except Exception as e:      
            print("----")
            print(e)
            raise Exception(f"Error processing file {file} (see above)")
        filename, _ = os.path.splitext(file)
        dataframe = pd.DataFrame.from_records(objects)
        dataframe.to_excel("{}.xlsx".format(filename), index=False)
    
class TxtParser(Parser):
    @extension_check(".txt")
    def parse(self, file:str) -> List[str]:
        with open(file, encoding="utf-8-sig") as content:
            text = content.read()
        lines = text.splitlines()
        return lines
        
class DocBookParser(Parser):
    @extension_check(".xml")
    def parse(self, file:str) -> List[str]:
        with open(file, encoding="utf-8") as content:
            text = content.read()
        bs = BeautifulSoup(text, "html.parser")
        paras = [p.text for p in bs.find_all("para")]
        return paras
    
class DocXmlParser(Parser):
    @extension_check(".xml")
    def parse(self, file:str) -> List[str]:
        with open(file, encoding="utf-8") as content:
            text = content.read()
        bs = BeautifulSoup(text, "html.parser")
        paras = [p.text for p in bs.find_all("w:p")]
        return paras

In [11]:
def parsermaker(option:str) -> Parser:
    """Factory function to produce a parser of the required type
    :param option str: type of parser
    :returns: parser of the specified type
    :rtype: Parser
    """
    mapping = {
        "txt":TxtParser,
        "dbxml":DocBookParser,
        "docxml":DocXmlParser
    }
    try:
        return mapping[option]()
    except:
        raise KeyError(f"No parser for the type {option}")    

## Функция main
### Принимает на вход расширение (doc/docx)
### Обрабатывает все файлы в данном расширении в текущей папке
### Для ошибочных файлов пишет ошибку и пропускает

In [19]:
def main(extension:str,
         true_year:Union[int, None]=None,
         true_vil:Union[List[str], None]=None) -> None:
    """Function to process the word files in the current directory
    :param extension str: doc or docx
    :param true_year int: year to validate (if needed)
    :param true_vil str: village name to validate (if needed)
    """
    antiword_check = 'if ! [ -x "$(command -v antiword)"];then apt install antiword;fi;'
    os.system(command=antiword_check)
    # validator = validationmaker(true_vil=true_vil, true_year=true_year)
    validator = None
    if extension == "docx":
        converter = convertermaker("docx", "xml")
        parser = parsermaker("docxml")
    elif extension == "doc":
        converter = convertermaker("doc", "txt")
        parser = parsermaker("txt")
    else:
        raise ValueError(f"Incorrect extension: {extension}")
    for file in os.listdir(os.getcwd()):
        if file.endswith(extension):
            try:
                newfile = converter.convert(filename=file)
                parser.main(file=newfile, validation_schema=validator)
            except Exception as e:
                print(e)
                continue

### Следующую ячейку выполнить один раз
### Скачать готовые таблицы на компьютер (левая панель в google colab)
### Если есть ошибочные файлы:
#### 1) Отредактировать
#### 2) Выполнить ячейку для удаления всех текущих файлов (вторая снизу)
#### 3) Подгрузить файлы заново, выполнить и скачать таблицы

In [13]:
main("docx", true_vil=["Источник Брязгун", "Чериков", "Веремейки"], true_year=2014)

----
Error parsing object
['XXIa-12 доп.', 'Чериков-2014', 'ОВБ, КВА', '[Евреи жили по всему городу Черикову, или были особые улицы, где было больше евреев?] [РАА одновременно:] По всему\\, нет-нет, нет. Бы\\ло бо\\льше, где хто. У них то\\же дома\\ свои\\ бы\\ли во всех. Ну, и ка\\ждый рабо\\тал… [Соб., перебивая: А чем занимались в основном?] Хто чем, хто где, как и сейча\\с... [Соб., перебивая: А ваш папа кто был?] Мой па\\па был кузнецо\\м. Вот, сра\\зу по\\сле войны\\ была\\ вро\\де ку\\зница, здесь стоя\\ла недалеко\\, на той стороне\\. Ну и он рабо\\тал кузнецо\\м. Приве… привози\\ли сюда\\, зна\\чит ремонти\\ровать, и он ремонти\\ровал. ', 'РАА']
Error processing file RAA&LMS&AMS_txt. .xml (see above)


In [None]:
!rm -r ./*

### Упаковать в архив все таблицы

In [23]:
!zip -r /content/file.zip . -i ./*.xlsx

  adding: L1_txt.xlsx (deflated 13%)
  adding: ShPI&SAS&BTM_txt.xlsx (deflated 1%)
  adding: SMZ_txt.xlsx (deflated 3%)
  adding: ZhLP_txt.xlsx (deflated 4%)
  adding: IVP_txt.xlsx (deflated 13%)
  adding: ChLI_txt.xlsx (deflated 13%)
  adding: MOG_txt.xlsx (deflated 1%)
  adding: LAA&S&IVP_txt.xlsx (deflated 1%)
  adding: L_txt.xlsx (deflated 13%)
  adding: GTL_txt.xlsx (deflated 2%)
  adding: SVV&VS&IVP_txt.xlsx (deflated 2%)
  adding: KEM&IVP_txt.xlsx (deflated 3%)
  adding: AMI_txt.xlsx (deflated 13%)
  adding: CA&BTM_txt.xlsx (deflated 7%)
  adding: Х_txt.xlsx (deflated 9%)
  adding: PVK_txt.xlsx (deflated 4%)
  adding: IF_txt.xlsx (deflated 13%)
  adding: BES_txt.xlsx (deflated 1%)
  adding: ITV_txt.xlsx (deflated 1%)
  adding: PMM&BTM_txt.xlsx (deflated 3%)
  adding: GNA_txt.xlsx (deflated 13%)
  adding: VSK&IVP_txt.xlsx (deflated 2%)
  adding: AMI&LO_txt.xlsx (deflated 13%)
  adding: IF&BTM_txt.xlsx (deflated 7%)
  adding: KNM_txt.xlsx (deflated 1%)
  adding: MEK&MDS_txt.xlsx (

### Скачать архив

In [24]:
from google.colab import files
files.download("/content/file.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

### Дополнительно

In [None]:
def merger():
    targets = [i for i in os.listdir(os.getcwd()) if i.endswith(".xlsx")]
    initial = pd.read_excel(targets[0], header=None, skiprows=1)
    if len(targets) > 1:
        for target in targets[1:]:
            df = pd.read_excel(target, header=None, skiprows=1)
            initial = pd.concat([initial, df], axis=0)
    return initial
merged = merger()
merged.to_excel("merged.xlsx", index=False)
from google.colab import files
files.download("/content/merged.xlsx")

## old part

In [None]:
# class fileTable:
#     def __init__(self, name, num):
#         self.name = name
#         self.num = num

#     def create(self):
#         new = open(file=self.name, mode="r", encoding="utf-8")
#         indexes = [i for i in range(self.num)]
#         self.table = pd.DataFrame(index=indexes, 
#                                     columns=["программа", "вопрос", \
#                                                         "село", "год", \
#                                                         "соб.1", "соб.2", \
#                                                         "соб.3", "соб.4", \
#                                                         "текст", "инф.1", "инф.2", \
#                                                         "инф.3", "инф.4", ""])
#         self.informants = set()
#         self.researchers = set()
#         for i in indexes:
#             line1 = new.readline().replace("\n","").split("-")
#             self.table.iloc[i, 0]=line1[0]
#             self.table.iloc[i, 1]=line1[1]
#             del line1
#             line1 = new.readline().replace("\n","")
#             if "'" in line1:
#                 line1 = line1.split("'") #обрабатываем 2 типа апострофов
#             elif "’" in line1:
#                 line1 = line1.split("’")
#             self.table.iloc[i, 2]=line1[0]
#             self.table.iloc[i, 3]=line1[1]
#             del line1
#             line1 = new.readline().replace("\n","").split(", ")
#             for n in range(len(line1)):
#                 if not line1[n] in self.researchers:
#                     self.researchers.add(line1[n])
#                 col = 4 + n
#                 self.table.iloc[i, col]=line1[n]
#             del col
#             del line1
#             newline = ""
#             line = ""
#             while newline != "\n":
#                 newline = new.readline()
#                 newline2 = newline.replace("\n","")
#                 line = line + newline2 + "@"
#                 if line[-2:] == "@@":
#                     break
#             line = line[:len(line)-2]
#             line2 = line[line.rfind("@") + 1:].split(", ") #extracting informants
#             line = line[:line.rfind("@")] # selecting text deleted +1
#             for n in range(len(line2)):
#                 if not line2[n] in self.informants:
#                     self.informants.add(line2[n])
#                 col = 9 + n
#                 self.table.iloc[i, col]=line2[n]#appending informants
#             self.table.iloc[i,8] = line #appending text to the column
#         new.close()

#     def visualize(self):
#         return self.table

#     def showHead(self):
#         print(self.table.head(10))

#     def write(self):
#         excelName = self.name.rstrip(".txt") + ".xlsx"
#         self.table.to_excel(excelName, index=False)
#         del excelName


In [None]:
# files = [x for x in filter(lambda x : ".txt" in x, os.listdir(os.getcwd()))]
# for file in files:
#     filename = file;
#     num = int(re.search(r'\d+', filename).group(0)) if re.search(r'\d+', filename) else 0
#     try:
#         this = fileTable(filename, num)
#         this.create()
#         this.write()
#     except Exception as e:
#         print(f"Exception has occured while processing the file {filename}:")
#         print(e)
#     del this