In [1]:
import dask.bag as db
import json

folder = 'CL_Cup IT_Data_Scince_секция_кейс_VK_датасет.zip'
test_data = db.read_text(
    'zip://ranking_test.jsonl',
    storage_options={'fo': folder},
    encoding='Windows-1251'
).map(json.loads)
train_data = db.read_text(
    'zip://ranking_train.jsonl',
    storage_options={'fo': folder},
    encoding='Windows-1251'
).map(json.loads)

In [2]:
test_df = test_data.to_dataframe().compute()
train_df = train_data.to_dataframe().compute()

In [3]:
import pandas as pd

test = test_df.explode(column='comments')
test['comment'] = test.comments.map(lambda dic: dic['text'])
test['score'] = test.comments.map(lambda dic: dic['score'])
test.drop(labels=['comments'], axis=1, inplace=True)
test.head()

Unnamed: 0,text,comment,score
0,"iOS 8.0.1 released, broken on iPhone 6 models,...",I&#x27;m still waiting for them to stabilize w...,
0,"iOS 8.0.1 released, broken on iPhone 6 models,...","For those who upgraded, no need to do a restor...",
0,"iOS 8.0.1 released, broken on iPhone 6 models,...",Upgraded shortly after it was released and suf...,
0,"iOS 8.0.1 released, broken on iPhone 6 models,...",I think they were under a lot of pressure on t...,
0,"iOS 8.0.1 released, broken on iPhone 6 models,...",Fix for those who already updated: http:&#x2F...,


In [4]:
train = train_df.explode(column='comments')
train['comment'] = train.comments.map(lambda dic: dic['text'])
train['score'] = train.comments.map(lambda dic: dic['score'])
train.drop(labels=['comments'], axis=1, inplace=True)
train.head()

Unnamed: 0,text,comment,score
0,How many summer Y Combinator fundees decided n...,Going back to school is not identical with giv...,0
0,How many summer Y Combinator fundees decided n...,There will invariably be those who don't see t...,1
0,How many summer Y Combinator fundees decided n...,For me school is a way to be connected to what...,2
0,How many summer Y Combinator fundees decided n...,I guess it really depends on how hungry you ar...,3
0,How many summer Y Combinator fundees decided n...,I know pollground decided to go back to school...,4


In [5]:
import nltk
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('omw-1.4')
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\user\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\user\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\user\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     C:\Users\user\AppData\Roaming\nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     C:\Users\user\AppData\Roaming\nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

In [11]:
import re
from nltk.corpus import stopwords
from nltk import word_tokenize
from nltk.stem import PorterStemmer
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet

class preprocessingWrapper:

  """
    Класс, в рамках которого происходит препроцессинг данных
    для последующих шагов, связанных с EDA и ML. Датафрейм
    оборачивается этим классом и в результате обертки
    возвращается обработанный датасет
  """

  @staticmethod
  def pos_tagger(treebank_tag: str):
    """ Возвращает Part-Of-Speech, относящийся к конкретному значению treebank_tag """
    if treebank_tag.startswith('J'):
        return wordnet.ADJ
    elif treebank_tag.startswith('V'):
        return wordnet.VERB
    elif treebank_tag.startswith('N'):
        return wordnet.NOUN
    elif treebank_tag.startswith('R'):
        return wordnet.ADV
    else:
        return None

  def __init__(self):
    self.stemmer = PorterStemmer()
    self.wordnet_lemmatizer = WordNetLemmatizer()
    self.spec_chars = re.compile(r"[$&+,:;=?@#|'<>.-^*()%!]")
    self.unicode_chars = re.compile(
      "["
      u"\U0001F600-\U0001F64F"  # emoticons
      u"\U0001F300-\U0001F5FF"  # symbols & pictographs
      u"\U0001F680-\U0001F6FF"  # transport & map symbols
      u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
      u"\U00002500-\U00002BEF"  # chinese char
      u"\U00002702-\U000027B0"
      u"\U00002702-\U000027B0"
      u"\U000024C2-\U0001F251"
      u"\U0001f926-\U0001f937"
      u"\U00010000-\U0010ffff"
      u"\u2640-\u2642" 
      u"\u2600-\u2B55"
      u"\u200d"
      u"\u23cf"
      u"\u23e9"
      u"\u231a"
      u"\ufe0f"  # dingbats
      u"\u3030"
      u"\u2705"
      "]+", 
      re.UNICODE
    )
    self.stop_words = re.compile(
      r'\b(?:'+'|'.join(set(stopwords.words('english'))).rstrip('|')+')\b'
    )
    
  def stem(self, tokens):
    return [self.stemmer.stem(token) for token in tokens]

  def lemma(self, tokens):
    return [
      (self.wordnet_lemmatizer.lemmatize(
        token, 
        pos=self.pos_tagger(nltk.pos_tag([token])[0][1])
      ) if self.pos_tagger(nltk.pos_tag([token])[0][1])
      else self.wordnet_lemmatizer.lemmatize(token)) 
      if token.isalpha() else token
      for token in tokens 
    ]

  def __call__(
    self, df: pd.DataFrame, column_for_cleaning: str, op: str
  ):
    """ 
      Этот основной метод позволяет 'обернуть' датафрейм и очистить столбец 
      
      df: pd.DataFrame
        - Датафрейм для очистки
      column_for_cleaning: str
        - Столбец для очистки
      op: str ('stem'/'lemma')
        - Какую операцию в итоге провести над токенами    
    """
    if op not in ['stem', 'lemma']:
      raise Exception(
        'op принимает значения "stem" или "lemma"'
      )

    column = df[column_for_cleaning]
    column = column.str.lower()

    # удаление спец. символов
    column = column.str.replace(self.spec_chars, '', regex=True) 

    # удаление других юникодных символов
    column = column.str.replace(self.unicode_chars, '', regex=True) 
    
    # удаление стоп-слов
    column = column.str.replace(self.stop_words, '', regex=True)

    # токенизация
    column = column.apply(word_tokenize, meta=('comment', 'object'))

    if op == 'stem':
      # стемматизация
      return column.apply(self.stem, meta=('comment', 'object'))
    else:
      # лемматизация
      return column.apply(self.lemma, meta=('comment', 'object'))

In [12]:
import dask.dataframe as ddf

preprocesser = preprocessingWrapper()
trained_ddf = ddf.from_pandas(
    train, npartitions=12
)
trained_posts_lemmatized = preprocesser(trained_ddf, 'comment', 'lemma')

In [8]:
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()

In [14]:
trained_posts_lemmatized = trained_posts_lemmatized.compute()

[                                        ] | 0% Completed | 2.73 s ms


KeyboardInterrupt: 