# Install modules

In [31]:
! pip install numpy
! pip install Pillow

# multiprocess module works, unlike conturent.futures.ProcessPoolExecutor,
#   with IPython (jupyter)
! pip install multiprocess

# wikipedia module used for donwloading texts from Wikipedia
! pip install wikipedia


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.1.2[0m[39;49m -> [0m[32;49m22.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.1.2[0m[39;49m -> [0m[32;49m22.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.1.2[0m[39;49m -> [0m[32;49m22.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.1.2[0m[39;49m -> [0m[32;49m22.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


# Scrape Source Texts from Wikipedia

In [4]:
import wikipedia
import re
import urllib.request
import time
import json
from concurrent import futures
import threading
from pathlib import Path

with urllib.request.urlopen('https://simple.wikipedia.org/wiki/List_of_countries') as f:
    scraped = f.read(1000000)

countries = re.findall(r'<a.*?title="(.*?)">.*?<\/a>', scraped.decode('utf-8'))
start = countries.index('Afghanistan')
end_incl = countries.index('Zimbabwe')
countries = countries[start: end_incl+1]

excluded_data = ['Change section']
def is_excluded(country):
    return any(e in country for e in excluded_data)

countries = [country for country in countries 
             if not is_excluded(country)] 

lock = threading.Lock()
remaining = len(countries)

def wiki_download(name):
    text = wikipedia.page(name, redirect=False, auto_suggest=False).content
    text = re.sub(r'==.*?==+', '', text)
    text = text.replace('\n\n\n', '\n')
    text = text.strip().split()
    
    global remaining
    with lock:
        remaining -= 1
        print(f'\r remaining: {remaining:03d} ', end='', flush=True)
        
    return text

# multithreading reduces download time 
# from > 3 minutes (single-thread version) to < 20s
with futures.ThreadPoolExecutor(16) as executor:
    texts = executor.map(wiki_download, countries)

# texts is a generator: list(texts) needed
with open(Path('data_scraped') / 'scraped_texts.json', 'wt') as f:
    json.dump(list(texts), f, ensure_ascii=False)

 remaining: 000 

# Generate Images

- all scans need to have the same resolution (upscaling/downscaling)
- potential extension: extracting numbers from tables
- line breaks

In [6]:
from PIL import Image, ImageDraw, ImageFont, ImageFilter
import math
import random
import json
from typing import Iterable, Union, Tuple, List, Iterable, Generator
from pathlib import Path
import numpy as np
import multiprocess

COLOR = (250, 250, 250)

CoordinatesRect = Tuple[Tuple[int, int], Tuple[int, int]]

STANDARD_BLOCK = dict(
    width_frac_interval=(0.2, 0.5),
    height_frac_interval=(0.2, 0.5),
    margin=(0.07, 0.07)
)

JUST_NUMBER_BLOCK = dict(
    width_frac_interval = (0.15, 0.15),
    height_frac_interval = (0.1, 0.1),
    margin = STANDARD_BLOCK['margin']
)

MAX_ATTEMPTS = 500

class GenerateImage:
    def __init__(self, text_split, paper_width = 800, ratio = 2 ** 0.5) -> None:
        self.rng = np.random.default_rng()
        self.text_split = text_split
        self.paper_width = paper_width
        self.paper_height = int(paper_width * ratio)
        self.reset_image()
        self.draw = ImageDraw.Draw(self.image)
        self.font_families = self.get_fonts()
        self.font_variants = 'Regular Italic Bold BoldItalic'.split()
        
    def reset_image(self):
        self.image = Image.new('RGB', size=(self.paper_width, self.paper_height), 
                               color=COLOR)  # A4
        
    def get_fonts(self):
        """ Get unique names of fonts in the font library 
        (w/o regular/italic/bold/bolditalic variants """
        # wrapped in list since sets (unlike dicts as from python 3.7) are not guaranteed
        # to be stable and we need to index font families (while choosing at random)
        return list({entry.name.split('-')[0] 
                    for entry in Path('fonts').iterdir() if entry.is_file()})
        
    @staticmethod
    def value_in_interval(theta: float, imin: float, imax: float):
        assert imax >= imin
        assert 0 <= theta <= 1
        return imin + (imax - imin) * theta

    def get_block_rectangle(self, 
                              width_frac_interval: Tuple[float, float],
                              height_frac_interval: Tuple[float, float],
                              margin: Tuple[float, float]
                              ) -> CoordinatesRect:
        """Generates a random rectangle (for the text to be placed in) according to
        the input parameters. Does not check any interesections.
        Args:
            width_frac_interval: min and max width (e.g. 0.3 == 30 % of the page width)
            height_frac_interval: min and max height (e.g. 0.4 == 40 % of the page heigth)
            margin (Tuple[float, float]): margin in 
        Returns:
            CoordinatesRect: coordinates of the rectangle
        """
        block_width_fraction = self.value_in_interval(theta=self.rng.random(),                                       
                                                      imin=width_frac_interval[0], 
                                                      imax=width_frac_interval[1])
        block_width = int(self.paper_width * block_width_fraction)

        block_height_fraction = self.value_in_interval(theta=self.rng.random(),                                       
                                                       imin=height_frac_interval[0], 
                                                       imax=height_frac_interval[1])
        block_height = int(self.paper_height * block_height_fraction)

        remaining_width = self.paper_width - block_width
        remaining_height = self.paper_height - block_height

        margin_x, margin_y = int(margin[0] * self.paper_width), int(margin[1] * self.paper_height)
        x_pos = self.rng.integers(margin_x, remaining_width - margin_x, endpoint=True)
        y_pos = self.rng.integers(margin_y, remaining_height - margin_y, endpoint=True)
        
        return (x_pos, y_pos), (x_pos+block_width, y_pos+block_height)
        
    def get_line_data(self, start_idx: int, text: List[str], font_size: int, 
                      font_family: str, block_width: int) -> Tuple[List[int], List[int]]:
        """Checks what part of the text can be printed to fit the block_width
        Args:
            start_idx (int): index in text with the first yet unprinted word
            text (List[str]): list of words (some of them could be printed on the lines(s) above)
            block_width (int): width of line in pixels

        Returns:
            Tuple[List[int], List[int]]: 
                item [0] of each of lists relates to the text[start_idx] word
                item [1] of each of lists relates to the text[start_idx+1] word
                ...
                item [-1] of each lists relates to the text[start_idx+len(lists)-1],
                    the last word that fits the line
                ...
                the first lists includes randomly chosen variants of the font 
                    (0==regular, ...), see self.font_variants
                the second output includes width of each word (including the leading
                space unless the word is the first on the line)
        """
        variants, widths = [], []
        committed_width = 0
        for idx in range(start_idx, len(text)):
            variant = self.rng.multinomial(1, [0.6] + [0.4 / 3] * 3).nonzero()[0].item()
            font_path = Path('fonts') / f'{font_family}-{self.font_variants[variant]}.ttf'
            font = ImageFont.truetype(str(font_path), font_size) 
            sep = '' if idx == start_idx else ' '
            text_width = self.draw.textlength(sep + text[idx], font=font)
            if committed_width + text_width > block_width:
                return variants, widths
            else:
                committed_width += text_width
                variants.append(variant)
                widths.append(text_width)
        # we can get here after all words from text are seen in the for loop
        return variants, widths
    
    def print_line(self, xy: Tuple[int, int], start_idx: int, text: List[str], font_size: int,
                   font_family: str, block_width: int) -> int:
        """Prints words from text (starting with the start_idx) index on the current line 
        (given by xy coordinages)
        Args:
            xy: coordinates of the top left corner of the current line
            start_idx: index in text with the first yet unprinted word
            text: list of words (some of them could be printed on the lines(s) above)
            block_width: width of line in pixels
        Returns:
            number of words printed on the current line
        """
        variants, widths = self.get_line_data(start_idx, text, 
                                              font_size, font_family, block_width)
        current_x, y = xy 
        all_text = ''
        for i, variant in enumerate(variants):
            font_path = Path('fonts') / f'{font_family}-{self.font_variants[variant]}.ttf'
            font = ImageFont.truetype(str(font_path), font_size)  # type: ignore
            sep = '' if i == 0 else ' '
            current_word = sep + text[start_idx + i]
            self.draw.text((current_x, y), current_word, 
                           fill=(40,40,40), font=font)
            all_text += current_word
            current_x += widths[i]
        
        return len(variants)
            
    def print_block(self, text: List[str], coordinates: CoordinatesRect) -> str:
        """ Prints the block of text. We cannot use the Image.Draw.multline_text method, which 
        reguires the same font for all words.
        Args:
            text: List of words. The method selects a contiguous span and prints it.
            coordinates (CoordinatesRect): the printed text will be positioned within
                this (x0, y0), (x1, y1) rectangle
        Returns:
            str: text printed in the rectangle
        """
        (x0, y0), (x1, y1) = coordinates
        block_width = x1 - x0 + 1
        block_height = y1 - y0 + 1
        
        max_start = int(0.75 * len(text))
        start_idx = 0 if len(text)==1 else self.rng.integers(0, max_start)
        current_idx = start_idx
        
        current_y = y0
        
        font_size = self.rng.integers(12, 18, endpoint=True)
        font_family_index = self.rng.integers(0, len(self.font_families))
        font_family = self.font_families[font_family_index]
        
        while True:
            add_y = int(font_size * 1.3)
            
            if current_y + add_y > y1:
                break
            
            words = self.print_line((x0, current_y), current_idx, text,      
                                    font_size, font_family, block_width)
            
            current_idx += words
            current_y += add_y
            
            # len(text) is checked also inside print_line -> get_line_data
            if words == 0 or current_idx >= len(text):
                break
            
        return ' '.join(text[start_idx: current_idx])
    
    @staticmethod
    def intersects(block1: CoordinatesRect, block2: CoordinatesRect,
                   margin: Tuple[int, int]) -> bool:
        """checks whether the two blocks intersect"""
        ((ax0, ay0), (ax1, ay1)) = block1
        ((bx0, by0), (bx1, by1)) = block2
        margin_x, margin_y = margin
        bx0, by0 = bx0 - margin_x, by0 - margin_y
        bx1, by1 = bx1 + margin_x, by1 + margin_y
        # rectange A is fully to the left or fully to the right of B
        if (ax0 < bx0 and ax1 < bx0) or (ax0 > bx1 and ax1 > bx1):
            return False
        # rectange A is fully to the top or fully to bottom of B
        if (ay0 < by0 and ay1 < by0) or (ay0 > by1 and ay1 > by1):
            return False
        return True
    
    def get_standard_blocks_coordinates(self, 
                                        no_of_blocks: int) -> List[CoordinatesRect]:
        """ Returns the list of up to no_of_block rectangles, without any overlap.
        Please excuse the quick solution based on placing rectangles randomly and 
        checking for overlaps.
        """
        assert no_of_blocks >= 1
        all_coordinates = []
        for _ in range(MAX_ATTEMPTS):  # the maximum number of attempts
            new_coords = self.get_block_rectangle(**STANDARD_BLOCK)
            margin = (int(STANDARD_BLOCK['margin'][0] * self.paper_width),
                      int(STANDARD_BLOCK['margin'][1] * self.paper_height))
            # any([]) == False
            if any(self.intersects(new_coords, old_coords, margin)  
                   for old_coords in all_coordinates):
                continue
            all_coordinates.append(new_coords)
            if len(all_coordinates) == no_of_blocks:
                return all_coordinates
        assert len(all_coordinates) >= 1
        return all_coordinates
    
    def get_number_block_coordinates(self, all_coordinates: 
                                     List[CoordinatesRect]) -> CoordinatesRect:
        """find another smaller rectangle, to include just a single number, that
        does not collide with any of the all_coordinates rectangles
        """
        for _ in range(MAX_ATTEMPTS):
            coords = self.get_block_rectangle(**JUST_NUMBER_BLOCK)
            margin = (int(JUST_NUMBER_BLOCK['margin'][0] * self.paper_width),
                      int(JUST_NUMBER_BLOCK['margin'][1] * self.paper_height))
            if not any(self.intersects(coords, old_coords, margin)  
                       for old_coords in all_coordinates):
                return coords
        raise Exception("Unable to place the number block")
        
    def get_random_number_string(self) -> str:
        '''get a random number (as text) to be placed in to the numeric rectangle'''
        number = self.rng.integers(10000, 999999, endpoint=True)
        variant = self.rng.integers(0, 3)
        if variant == 0:
            text = str(number)
        elif variant == 2:
            text = '{:,}'.format(number)
        else:
            text = '{:,}'.format(number).replace(',', ' ')
        return text
    
    def print_blocks(self, no_of_standard_blocks: int) -> List:
        """ Prints standard blocks and one numeric blocks
        Returns: A list regarding printed out blocks. Item[0] always relates to 
        the rectangle with a single number.
            item[0] is a dict including value (the printed number as text)
                and coordinates (x0, y0, x1, y1 attributes) of the rectangle
                with the number
            item[1], ..., item[-1] include a str (texts printed in respective
                rectangles)
        """
        while True:  # loop until numeric block placed
            standard_results = []
            std_coordinates = self.get_standard_blocks_coordinates(no_of_standard_blocks)
            for coords in std_coordinates:
                text_idx = self.rng.integers(0, len(self.text_split))
                text = self.text_split[text_idx]
                res = self.print_block(text, coords)
                standard_results.append(res)
            
            # add a single plain numeric block
            text = self.get_random_number_string().split()
            try:
                nb_coordinates = self.get_number_block_coordinates(std_coordinates)
                numeric_result = self.print_block(text, nb_coordinates)
                numeric_dict = dict(value=numeric_result, 
                                    x0=int(nb_coordinates[0][0]),
                                    y0=int(nb_coordinates[0][1]),
                                    x1=int(nb_coordinates[1][0]),
                                    y1=int(nb_coordinates[1][1]))
                return [numeric_dict] + standard_results
            except:  # unable to place the numeric block, the while loop will continue
                self.reset_image()
        
    def process_img_deterministic(self, blur, noise, contrast, offset, rotate):
        self.image = self.image.filter(ImageFilter.GaussianBlur(radius=blur))
        noise = Image.effect_noise(self.image.size, noise)
        noise_arr = np.asarray(noise).astype(float)[:, :, None] - 128
        image_arr = np.asarray(self.image).astype(float)
        image_arr = (image_arr * contrast) + offset
        res_arr = (noise_arr + image_arr).clip(0, 255)
        self.image = Image.fromarray(res_arr.astype(np.uint8))
        self.image = self.image.rotate(rotate, 
                                       fillcolor=COLOR,
                                       resample=Image.Resampling.BICUBIC)  # type: ignore

    def process_image(self) -> float:
        rotation = self.rng.uniform(-2,2)
        self.process_img_deterministic(blur=self.rng.uniform(0, 0.3),
                                       noise=self.rng.uniform(1,5),
                                       contrast=self.rng.uniform(0.95, 1.05),
                                       offset=self.rng.uniform(-10,10),
                                       rotate=rotation)
        return rotation

    def save_image(self, path):
        self.image.save(path)
        return self
    
    def __call__(self, path, blocks: Union[int, Tuple[int, int]]):
        if isinstance(blocks, Iterable):
            assert len(blocks) == 2
            blocks = self.rng.integers(blocks[0], blocks[1], endpoint=True)
        result = self.print_blocks(blocks)
        rotation = self.process_image()
        result[0]['rotation'] = rotation
        self.save_image(path)      
        return result
    
def process_one(num_idx, display_image=False):
    with open(Path('data_scraped') / 'scraped_texts.json', 'rt') as f:
        texts = json.load(f)
    path = Path('blocks_images') / f'img{num_idx:04d}.jpg'
    genimg = GenerateImage(texts)
    result = genimg(path=path, blocks=(2, 4))
    if display_image:
        display(genimg.image)
    return result

# process_one(0)

from multiprocessing import cpu_count
import multiprocess

GENERATED_IMAGES_COUNT = 100

cores = cpu_count()
with multiprocess.Pool(cores) as pool:
    results = pool.map(process_one, range(GENERATED_IMAGES_COUNT))
    
for i, result in enumerate(results):
    path = Path('blocks_texts') / f'{i:04d}.json'
    with open(path, 'wt') as f:
        json.dump(result, f, ensure_ascii=False)

In [None]:
API_KEY = 'K88811114388957'